Skip to content

Commit

Permalink
Add progress and error reporting API (#650)
Browse files Browse the repository at this point in the history
* Add reporting API and tests

* Add reporting in COCO, VOC and YOLO formats for import and export

* Update changelog
  • Loading branch information
Maxim Zhiltsov authored Feb 18, 2022
1 parent fed65ab commit 6070d05
Show file tree
Hide file tree
Showing 17 changed files with 1,072 additions and 394 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
to create smaller datasets from bigger ones
(<https://github.com/openvinotoolkit/datumaro/pull/636>,
<https://github.com/openvinotoolkit/datumaro/pull/640>)
- API to report dataset import and export progress;
API to report dataset import and export errors and take action (skip, fail)
(supported in COCO, VOC and YOLO formats)
(<https://github.com/openvinotoolkit/datumaro/pull/650>)
- Support for downloading the ImageNetV2 and COCO datasets
(<https://github.com/openvinotoolkit/datumaro/pull/653>,
<https://github.com/openvinotoolkit/datumaro/pull/659>)
Expand Down
12 changes: 9 additions & 3 deletions datumaro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@
PointsCategories, Polygon, PolyLine, RgbColor, RleMask,
)
from .components.cli_plugin import CliPlugin
from .components.converter import Converter
from .components.converter import (
Converter, ExportErrorPolicy, FailingExportErrorPolicy,
)
from .components.dataset import (
Dataset, DatasetPatch, DatasetSubset, IDataset, ItemStatus, eager_mode,
)
from .components.environment import Environment, PluginRegistry
from .components.extractor import (
DEFAULT_SUBSET_NAME, CategoriesInfo, DatasetItem, Extractor, IExtractor,
Importer, ItemTransform, SourceExtractor, Transform,
DEFAULT_SUBSET_NAME, CategoriesInfo, DatasetItem, Extractor,
FailingImportErrorPolicy, IExtractor, Importer, ImportErrorPolicy,
ItemTransform, SourceExtractor, Transform,
)
from .components.hl_ops import ( # pylint: disable=redefined-builtin
export, filter, merge, run_model, transform, validate,
)
from .components.launcher import Launcher, ModelTransform
from .components.media import ByteImage, Image, MediaElement, Video, VideoFrame
from .components.media_manager import MediaManager
from .components.progress_reporting import (
NullProgressReporter, ProgressReporter,
)
from .components.validator import Validator
from .version import VERSION
82 changes: 77 additions & 5 deletions datumaro/components/converter.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,87 @@
# Copyright (C) 2019-2021 Intel Corporation
# Copyright (C) 2019-2022 Intel Corporation
#
# SPDX-License-Identifier: MIT

from tempfile import mkdtemp
from typing import Union
from typing import NoReturn, Optional, Tuple, TypeVar, Union
import logging as log
import os
import os.path as osp
import shutil

from attrs import define, field
import attr

from datumaro.components.cli_plugin import CliPlugin
from datumaro.components.extractor import DatasetItem
from datumaro.components.errors import (
AnnotationExportError, DatumaroError, ItemExportError,
)
from datumaro.components.extractor import DatasetItem, IExtractor
from datumaro.components.media import Image
from datumaro.components.progress_reporting import (
NullProgressReporter, ProgressReporter,
)
from datumaro.util.meta_file_util import save_meta_file
from datumaro.util.os_util import rmtree
from datumaro.util.scope import on_error_do, scoped

T = TypeVar('T')

class _ExportFail(DatumaroError):
pass

class ExportErrorPolicy:
def report_item_error(self, error: Exception, *,
item_id: Tuple[str, str]) -> None:
"""
Allows to report a problem with a dataset item.
If this function returns, the converter must skip the item.
"""

if not isinstance(error, _ExportFail):
ie = ItemExportError(item_id)
ie.__cause__ = error
return self._handle_item_error(ie)
else:
raise error

def report_annotation_error(self, error: Exception, *,
item_id: Tuple[str, str]) -> None:
"""
Allows to report a problem with a dataset item annotation.
If this function returns, the converter must skip the annotation.
"""

if not isinstance(error, _ExportFail):
ie = AnnotationExportError(item_id)
ie.__cause__ = error
return self._handle_annotation_error(ie)
else:
raise error

def _handle_item_error(self, error: ItemExportError) -> None:
"""This function must either call fail() or return."""
self.fail(error)

def _handle_annotation_error(self, error: AnnotationExportError) -> None:
"""This function must either call fail() or return."""
self.fail(error)

def fail(self, error: Exception) -> NoReturn:
raise _ExportFail from error

class FailingExportErrorPolicy(ExportErrorPolicy):
pass

@define(eq=False)
class ExportContext:
progress_reporter: ProgressReporter = field(default=None,
converter=attr.converters.default_if_none(factory=NullProgressReporter))
error_policy: ExportErrorPolicy = field(default=None,
converter=attr.converters.default_if_none(factory=FailingExportErrorPolicy))

class NullExportContext(ExportContext):
pass

class Converter(CliPlugin):
DEFAULT_IMAGE_EXT = None
Expand Down Expand Up @@ -71,8 +137,12 @@ def patch(cls, dataset, patch, save_dir, **options):
def apply(self):
raise NotImplementedError("Should be implemented in a subclass")

def __init__(self, extractor, save_dir, save_images=False,
image_ext=None, default_image_ext=None, save_dataset_meta=False):
def __init__(self, extractor: IExtractor, save_dir: str, *,
save_images: bool = False,
image_ext: Optional[str] = None,
default_image_ext: Optional[str] = None,
save_dataset_meta: bool = False,
ctx: Optional[ExportContext] = None):
default_image_ext = default_image_ext or self.DEFAULT_IMAGE_EXT
assert default_image_ext
self._default_image_ext = default_image_ext
Expand All @@ -93,6 +163,8 @@ def __init__(self, extractor, save_dir, save_images=False,
else:
self._patch = None

self._ctx: ExportContext = ctx or NullExportContext()

def _find_image_ext(self, item: Union[DatasetItem, Image]):
src_ext = None

Expand Down
160 changes: 139 additions & 21 deletions datumaro/components/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
import logging as log
import os
import os.path as osp
import warnings

from datumaro.components.annotation import AnnotationType, LabelCategories
from datumaro.components.converter import Converter
from datumaro.components.config_model import Source
from datumaro.components.converter import (
Converter, ExportContext, ExportErrorPolicy, _ExportFail,
)
from datumaro.components.dataset_filter import (
XPathAnnotationsFilter, XPathDatasetFilter,
)
Expand All @@ -29,9 +33,12 @@
)
from datumaro.components.extractor import (
DEFAULT_SUBSET_NAME, CategoriesInfo, DatasetItem, Extractor, IExtractor,
ItemTransform, Transform,
ImportContext, ImportErrorPolicy, ItemTransform, Transform, _ImportFail,
)
from datumaro.components.launcher import Launcher, ModelTransform
from datumaro.components.progress_reporting import (
NullProgressReporter, ProgressReporter,
)
from datumaro.plugins.transforms import ProjectLabels
from datumaro.util import is_method_redefined
from datumaro.util.log_utils import logging_disabled
Expand Down Expand Up @@ -877,7 +884,9 @@ def flush_changes(self):
self._data.flush_changes()

@scoped
def export(self, save_dir: str, format: Union[str, Type[Converter]],
def export(self, save_dir: str, format: Union[str, Type[Converter]], *,
progress_reporter: Optional[ProgressReporter] = None,
error_policy: Optional[ExportErrorPolicy] = None,
**kwargs) -> None:
"""
Saves the dataset in some format.
Expand All @@ -887,7 +896,9 @@ def export(self, save_dir: str, format: Union[str, Type[Converter]],
format - The desired output format.
If a string is passed, it is treated as a plugin name,
which is searched for in the dataset environment.
**kwargs - Parameters for the export format
progress_reporter - An object to report progress
error_policy - An object to report format-related errors
**kwargs - Parameters for the format
"""

if not save_dir:
Expand All @@ -910,13 +921,59 @@ def export(self, save_dir: str, format: Union[str, Type[Converter]],
inplace = False
os.makedirs(save_dir, exist_ok=True)

if not inplace:
converter.convert(self, save_dir=save_dir, **kwargs)
if not self.is_bound:
self.bind(save_dir, format, options=copy(kwargs))
self.flush_changes()
else:
converter.patch(self, self.get_patch(), save_dir=save_dir, **kwargs)
has_ctx_args = progress_reporter is not None or error_policy is not None

if not progress_reporter:
progress_reporter = NullProgressReporter()

assert 'ctx' not in kwargs
converter_kwargs = copy(kwargs)
converter_kwargs['ctx'] = ExportContext(
progress_reporter=progress_reporter,
error_policy=error_policy)

try:
if not inplace:
try:
converter.convert(self, save_dir=save_dir,
**converter_kwargs)
except TypeError as e:
# TODO: for backward compatibility. To be removed after 0.3
if "unexpected keyword argument 'ctx'" not in str(e):
raise

if has_ctx_args:
warnings.warn("It seems that '%s' converter "
"does not support progress and error reporting, "
"it will be disabled" % format,
DeprecationWarning)
converter_kwargs.pop('ctx')

converter.convert(self, save_dir=save_dir,
**converter_kwargs)
else:
try:
converter.patch(self, self.get_patch(), save_dir=save_dir,
**converter_kwargs)
except TypeError as e:
# TODO: for backward compatibility. To be removed after 0.3
if "unexpected keyword argument 'ctx'" not in str(e):
raise

if has_ctx_args:
warnings.warn("It seems that '%s' converter "
"does not support progress and error reporting, "
"it will be disabled" % format,
DeprecationWarning)
converter_kwargs.pop('ctx')

converter.patch(self, self.get_patch(), save_dir=save_dir,
**converter_kwargs)
except _ExportFail as e:
raise e.__cause__

self.bind(save_dir, format, options=copy(kwargs))
self.flush_changes()

def save(self, save_dir: Optional[str] = None, **kwargs) -> None:
options = dict(self._options)
Expand All @@ -931,8 +988,27 @@ def load(cls, path: str, **kwargs) -> Dataset:

@classmethod
def import_from(cls, path: str, format: Optional[str] = None, *,
env: Optional[Environment] = None, **kwargs) -> Dataset:
from datumaro.components.config_model import Source
env: Optional[Environment] = None,
progress_reporter: Optional[ProgressReporter] = None,
error_policy: Optional[ImportErrorPolicy] = None,
**kwargs) -> Dataset:
"""
Creates a `Dataset` instance from a dataset on the disk.
Args:
path - The input file or directory path
format - Dataset format.
If a string is passed, it is treated as a plugin name,
which is searched for in the `env` plugin context.
If not set, will try to detect automatically,
using the `env` plugin context.
env - A plugin collection. If not set, the built-in plugins are used
progress_reporter - An object to report progress.
Implies earger loading.
error_policy - An object to report format-related errors.
Implies earger loading.
**kwargs - Parameters for the format
"""

if env is None:
env = Environment()
Expand All @@ -952,17 +1028,59 @@ def import_from(cls, path: str, format: Optional[str] = None, *,
else:
raise UnknownFormatError(format)

extractors = []
for src_conf in detected_sources:
if not isinstance(src_conf, Source):
src_conf = Source(src_conf)
extractors.append(env.make_extractor(
src_conf.format, src_conf.url, **src_conf.options
))
# TODO: probably, should not be available in lazy mode, because it
# becomes unreliable and error-prone. For progress reporting it
# makes little sense, because loading stage is spread over other
# operations. Error reporting is going to be unreliable.
has_ctx_args = progress_reporter is not None or error_policy is not None
eager = has_ctx_args

if not progress_reporter:
progress_reporter = NullProgressReporter()
pbars = progress_reporter.split(len(detected_sources))

try:
extractors = []
for src_conf, pbar in zip(detected_sources, pbars):
if not isinstance(src_conf, Source):
src_conf = Source(src_conf)

extractor_kwargs = dict(src_conf.options)

assert 'ctx' not in extractor_kwargs
extractor_kwargs['ctx'] = ImportContext(
progress_reporter=pbar,
error_policy=error_policy)

try:
extractors.append(env.make_extractor(
src_conf.format, src_conf.url, **extractor_kwargs
))
except TypeError as e:
# TODO: for backward compatibility. To be removed after 0.3
if "unexpected keyword argument 'ctx'" not in str(e):
raise

if has_ctx_args:
warnings.warn("It seems that '%s' extractor "
"does not support progress and error reporting, "
"it will be disabled" % src_conf.format,
DeprecationWarning)
extractor_kwargs.pop('ctx')

extractors.append(env.make_extractor(
src_conf.format, src_conf.url, **extractor_kwargs
))

dataset = cls.from_extractors(*extractors, env=env)
if eager:
dataset.init_cache()
except _ImportFail as e:
raise e.__cause__

dataset = cls.from_extractors(*extractors, env=env)
dataset._source_path = path
dataset._format = format

return dataset

@staticmethod
Expand Down
Loading

0 comments on commit 6070d05

Please sign in to comment.