Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Modifications for variable Audio and Sampler Generalisation to different Download Tasks #110

Merged
merged 12 commits into from
Nov 18, 2021
33 changes: 23 additions & 10 deletions hearpreprocess/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
download_file,
new_basedir,
str2int,
safecopy,
)

INCLUDE_DATESTR_IN_FINAL_PATHS = False
Expand Down Expand Up @@ -665,9 +666,10 @@ def run(self):
# duration
# sample duration is specified in the task config.
# The specified sample duration is in seconds
metadata = self.trim_event_metadata(
metadata, duration=self.task_config["sample_duration"]
)
if self.task_config["sample_duration"] is not None:
khumairraj marked this conversation as resolved.
Show resolved Hide resolved
metadata = self.trim_event_metadata(
metadata, duration=self.task_config["sample_duration"]
)
else:
raise ValueError(
"%s embedding_type unknown" % self.task_config["embedding_type"]
Expand Down Expand Up @@ -818,6 +820,14 @@ def run(self):
# minutes or the timestamp embeddings will explode
sample_duration = self.task_config["sample_duration"]
max_split_duration = self.get_max_split_duration()
if sample_duration is None:
assert max_split_duration is None, (
"If the sample duration is set to None i.e. orignal audio files "
"are being used without any trimming or padding, then the "
"max_split_duration should also be None, so that no "
"subsampling is done as the audio file length is not "
"consistent."
)

# If max_split_duration is not None set the max_files so that
# the total duration of all the audio files after subsampling
Expand Down Expand Up @@ -919,15 +929,18 @@ def requires(self):

def run(self):
self.createsplit()

for audiofile in tqdm(list(self.requires()["corpus"].splitdir.iterdir())):
newaudiofile = self.splitdir.joinpath(f"{audiofile.stem}.wav")
audio_util.trim_pad_wav(
str(audiofile),
str(newaudiofile),
duration=self.task_config["sample_duration"],
)

if self.task_config["sample_duration"] is not None:
audio_util.trim_pad_wav(
str(audiofile),
str(newaudiofile),
duration=self.task_config["sample_duration"],
)
else:
# If the sample_duration is None, the file will be copied
# without any trimming or padding
safecopy(src=audiofile, dst=newaudiofile)
self.mark_complete()


Expand Down
63 changes: 38 additions & 25 deletions hearpreprocess/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import random
import shutil
from pathlib import Path
from typing import Optional, Dict, Any
from typing import Callable, Optional, Dict, Any
from urllib.parse import urlparse
import tempfile
import copy

import click
import luigi
Expand All @@ -30,6 +31,7 @@
from hearpreprocess import dcase2016_task2, nsynth_pitch, speech_commands, spoken_digit
from hearpreprocess.util.luigi import WorkTask
import hearpreprocess.util.audio as audio_util
import hearpreprocess.util.luigi as luigi_util

logger = logging.getLogger("luigi-interface")
# Currently the sampler is only allowed to run for open tasks
Expand Down Expand Up @@ -59,45 +61,36 @@
"task_config": dcase2016_task2.generic_task_config,
"audio_sample_size": 4,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
turian marked this conversation as resolved.
Show resolved Hide resolved
},
"nsynth_pitch": {
"task_config": nsynth_pitch.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
},
"speech_commands": {
"task_config": speech_commands.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": pipeline.get_download_and_extract_tasks,
},
"spoken_digit": {
"task_config": spoken_digit.generic_task_config,
"audio_sample_size": 100,
"necessary_keys": [],
"get_download_and_extract_tasks": tfds_pipeline.get_download_and_extract_tasks_tfds,
},
# Add the sampler config for the secrets task if the secret task config was found.
# Not available for participants
**secret_config,
}


class RandomSampleOriginalDataset(WorkTask):
class _RandomSampleOriginalDataset(WorkTask):
necessary_keys = luigi.ListParameter()
audio_sample_size = luigi.IntParameter()

def requires(self):
# If this is a TensorFlow dataset then use the tfds pipeline
if "tfds_task_name" in self.task_config:
return tfds_pipeline.get_download_and_extract_tasks_tfds(self.task_config)

return pipeline.get_download_and_extract_tasks(self.task_config)

@staticmethod
def safecopy(src, dst):
# Make sure the parent destination directory exists
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)

@staticmethod
def trimcopy_audio(src, tmp_dst, fin_dst, small_duration):
"""
Expand Down Expand Up @@ -167,7 +160,9 @@ def run(self):

# Copy all the non audio files
for file in tqdm(copy_files):
self.safecopy(src=copy_from.joinpath(file), dst=copy_to.joinpath(file))
luigi_util.safecopy(
src=copy_from.joinpath(file), dst=copy_to.joinpath(file)
)

# Save all the audio after trimming them to small sample duration
# The small sample duration(in seconds) is specified in the small
Expand All @@ -189,6 +184,29 @@ def run(self):
shutil.make_archive(copy_to, "zip", copy_to)


def get_sampler_task(sampler_config: Dict[str, Any]) -> _RandomSampleOriginalDataset:
"""
Returns a task to do sampling after downloading the dataset with
download and extract tasks from the dataset specific
`get_download_and_extract_tasks` function
"""
_task_config: Dict[str, Any] = copy.deepcopy(sampler_config["task_config"])
_task_config["mode"] = _task_config["default_mode"]
_get_download_and_extract_tasks: Callable = sampler_config[
"get_download_and_extract_tasks"
]

class RandomSampleOriginalDataset(_RandomSampleOriginalDataset):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have RandomSampleOriginalDataset and _RandomSampleOriginalDataset? Why can't we just have only RandomSampleOriginalDataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason to do this here like this is - The RandomSampleOriginalDataset requires the get_download_and_extract_task, which is a function and is specific to the task. This returns the task to download and extract the task. So the _RandomSampleOriginalDataset is overridden and named as RandomSampleOriginalDataset and the tasks are added. I cannot refer to both the global variable _RandomSampleOriginalDataset and local variable RandomSampleOriginalDataset inside the get_sampler_task function with the same name. So I had to make different names for them.

task_config = _task_config
audio_sample_size = sampler_config["audio_sample_size"]
necessary_keys = sampler_config["necessary_keys"]

def requires(self):
return _get_download_and_extract_tasks(self.task_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrmmm really? I mean this is the sampler so it's not core code, it's just for testing, but this seems wrong and smells bad.

requires() in luigi is supposed to return a Luigi task. Not do work. run() is where work should occur. Otherwise you might have weird luigi bugs that are hard to debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_download_and_extract_tasks returns the tasks which will download and extract. So technically, the requires is still returning a list of tasks. This is how our main pipeline is working where we use this function to build the task and then pass it in the ExtractMetadata as luigiParameter and put it in the requires. As discussed in the previous comment, the main reason to do this here, is that the _get_download_and_extract_tasks is different for different tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return RandomSampleOriginalDataset


@click.command()
@click.argument("task")
@click.option(
Expand All @@ -202,15 +220,10 @@ def main(task: str, num_workers: Optional[int] = None):
if num_workers is None:
num_workers = multiprocessing.cpu_count()
logger.info(f"Using {num_workers} workers")
config: Dict[str, Any] = configs[task]
default_config: str = config["task_config"]["default_mode"]
config["task_config"]["mode"] = default_config
sampler = RandomSampleOriginalDataset(
task_config=config["task_config"],
audio_sample_size=config["audio_sample_size"],
necessary_keys=config["necessary_keys"],
)
pipeline.run(sampler, num_workers=num_workers)

sampler_config: Dict[str, Any] = configs[task]
sampler = get_sampler_task(sampler_config)
pipeline.run(sampler(), num_workers=num_workers)


if __name__ == "__main__":
Expand Down
3 changes: 3 additions & 0 deletions hearpreprocess/util/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def get_audio_dir_stats(
all_file_paths,
)
)
if len(audio_paths) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait why would this happen? Why is this okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no audio in the downloaded directory, we should return nothing. This can happen, for example, if a downloaded directory has just metadata files, we still need to put that in the requires of the ExtractMetadata, so that the task can actually run. This function, the one here, runs on all the downloaded directories in the requires. So, this is not a problem as we still have another assert below, So that if audio files are found and the stats are not calculated, the assert will throw, if audio files are not found, which is in this case, it will return the empty dict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print("No audio files present in the folder")
return {}
rng = random.Random(0)
rng.shuffle(audio_paths)

Expand Down
11 changes: 11 additions & 0 deletions hearpreprocess/util/luigi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os.path
from functools import partial
from pathlib import Path
import shutil

import luigi
import requests
Expand Down Expand Up @@ -219,3 +220,13 @@ def str2int(s: str) -> int:
https://stackoverflow.com/a/16008760/82733
"""
return int(hashlib.sha1(s.encode("utf-8")).hexdigest(), 16) % (2 ** 32 - 1)


def safecopy(src, dst):
"""
Copies a file after checking if the parent destination directory exists
If the parent doesnot exists, the parent directory will be made and the
file will be copied
"""
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
14 changes: 13 additions & 1 deletion hearpreprocess/util/task_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def validate_generic_task_config(
"embedding_type": Or("scene", "event", str),
"prediction_type": Or("multiclass", "multilabel", str),
"split_mode": Or("trainvaltest", "presplit_kfold", "new_split_kfold"),
"sample_duration": Or(float, int),
# When the sample duration is None, the original audio is retained
# without any trimming and padding
"sample_duration": Or(float, int, None),
"evaluation": Schema([str]),
"default_mode": Or("5h", "50h", "full", str),
}
Expand Down Expand Up @@ -177,6 +179,12 @@ def validate_generic_task_config(
): object,
}
)
# If the sample duration is set to None, the max_task_duration_by_split
# should also be None and no subsampling will be done
if task_config["sample_duration"] is None:
schema["max_task_duration_by_split"] = Schema(
{split: Or(int, float, None) for split in SPLITS}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None, not int, float, None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

)
elif split_mode in ["presplit_kfold", "new_split_kfold"]:

assert (
Expand All @@ -203,6 +211,10 @@ def validate_generic_task_config(
): object,
}
)
# If the sample duration is set to None, the max_task_duration_by_fold
# should also be None and no subsampling will be done
if task_config["sample_duration"] is None:
schema["max_task_duration_by_fold"] = None
else:
raise ValueError("Invalid split_mode")

Expand Down