diff --git a/class_config/colored_robots.yaml b/class_config/colored_robots.yaml index e23a2b7..8ba9b6a 100644 --- a/class_config/colored_robots.yaml +++ b/class_config/colored_robots.yaml @@ -1,4 +1,4 @@ -squeeze_classes: +group_classes: - robot_red - robot_blue - robot_unknown diff --git a/class_config/default.yaml b/class_config/default.yaml index a3bcb76..83f72fd 100644 --- a/class_config/default.yaml +++ b/class_config/default.yaml @@ -1,2 +1,2 @@ -squeeze_classes: +group_classes: surrogate_class: "" diff --git a/scripts/old_dataset_script.py b/scripts/old_dataset_script.py new file mode 100644 index 0000000..641ec39 --- /dev/null +++ b/scripts/old_dataset_script.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 + +import os +import yaml +import cv2 +import random +import argparse +import numpy as np + + +# Available classes for YOEO +CLASSES = { + 'bb_classes': ['ball', 'robot'], + 'segmentation_classes': ['field edge', 'lines'], + 'ignored_classes': ['goalpost', 'obstacle', 'L-Intersection', 'X-Intersection', 'T-Intersection'] + } + +#ROBOT_CLASSES = ["robot_red", "robot_blue", "robot_unknown"] +#ROBOT_NUMBER = [None, 1, 2, 3, 4, 5, 6] +ROBOT_COLOR_NUMBERS = [ + "red_None", "red_1", "red_2", "red_3", "red_4", "red_5", "red_6", + "blue_None", "blue_1", "blue_2", "blue_3", "blue_4", "blue_5", "blue_6", + "unknown_None", "unknown_1", "unknown_2", "unknown_3", "unknown_4", "unknown_5", "unknown_6" + ] + +""" +This script reads annotations in the expected yaml format below +to generate the corresponding yolo .txt files and the segmentation masks. + + +Expected YAML format (Example): +=============================== + +Please refer to the TORSO-21 documentation for this: https://github.com/bit-bots/TORSO_21_dataset#structure + + +Expects following file tree (Example): +====================================== + +We expect to be given a subdirectory of the structure documented here: https://github.com/bit-bots/TORSO_21_dataset#structure + + # TORSO-21 -> reality|simulation -> train|test +├── annotations.yaml +├── images +│ ├── image1.jpg +│ ├── image2.png +│ └── ... +└── segmentations + ├── image1.png + ├── image2.png + └── ... + +Produces the following file tree (Example): +=========================================== + + # TORSO-21 -> reality|simulation -> train|test +├── train.txt +├── test.txt +├── yoeo.names +├── yoeo.data +├── images # Images already exist in dataset; symlinks are created in destination-dir case +│ ├── image1.jpg +│ ├── image2.png +│ └── ... +├── labels +│ ├── image1.txt +│ ├── image2.txt +│ └── ... +└── segmentations + ├── image1.png + ├── image2.png + └── ... + +with train.txt and test.txt containing absolute image-paths for training and evaluation respectively +with yoeo.names containing the class names of bounding boxes +with yoeo.data: containing number of bounding box classes as well as absolute path to train.txt, test.txt and yoeo.names +""" + + +def range_limited_float_type_0_to_1(arg): + """Type function for argparse - a float within some predefined bounds + Derived from 'https://stackoverflow.com/questions/55324449/how-to-specify-a-minimum-or-maximum-float-value-with-argparse/55410582#55410582'. + """ + minimum = 0.0 + maximum = 1.0 + try: + f = float(arg) + except ValueError: + raise argparse.ArgumentTypeError("Must be a floating point number") + if f < minimum or f > maximum: + raise argparse.ArgumentTypeError(f"Argument must be between {minimum} and {maximum}") + return f + + +parser = argparse.ArgumentParser(description="Create YOEO labels from yaml files.") +parser.add_argument("dataset_dir", type=str, help="Directory to a dataset. Output will be written here, unless --destination-dir is given.") +parser.add_argument("annotation_file", type=str, help="Full path of annotation file") +parser.add_argument("testsplit", type=range_limited_float_type_0_to_1, help="Amount of test images from total images: train/test split (between 0.0 and 1.0)") +parser.add_argument("-s", "--seed", type=int, default=random.randint(0, (2**64)-1), help="Seed, that controls the train/test split (integer)") +parser.add_argument("--destination-dir", type=str, default="", help="Writes output files to specified directory.") +parser.add_argument("--create-symlinks", action="store_true", help="Create symlinks for image files to destination-dir. Useful, when using read-only datasets. Requires --destination-dir") +parser.add_argument("--ignore-blurred", action="store_true", help="Ignore blurred labels") +parser.add_argument("--ignore-concealed", action="store_true", help="Ignore concealed labels") +parser.add_argument("--ignore-classes", nargs="+", default=[], help="Append class names, to be ignored") +args = parser.parse_args() + +# Remove ignored classes from CLASSES list +for ignore_class in args.ignore_classes: + for category in CLASSES.keys(): + if ignore_class in CLASSES[category]: + CLASSES[category].remove(ignore_class) + print(f"Ignoring class '{ignore_class}'") + +# Defaults +create_symlinks = False +dataset_dir = args.dataset_dir +destination_dir = args.dataset_dir +image_names = [] # Collect image paths for train/test split + +# Overwrite defaults, if destination path is given +if args.destination_dir: + create_symlinks = args.create_symlinks + destination_dir = args.destination_dir + +# Create output directories if needed +images_dir = os.path.join(destination_dir, "images") +if not os.path.exists(images_dir): + os.makedirs(images_dir) + +labels_dir = os.path.join(destination_dir, "labels") +if not os.path.exists(labels_dir): + os.makedirs(labels_dir) + +masks_dir = os.path.join(destination_dir, "segmentations") +if not os.path.exists(masks_dir): + os.makedirs(masks_dir) + +# Load annotation data from yaml file +annotations_file = args.annotation_file #os.path.join(dataset_dir, "annotations.yaml") +with open(annotations_file) as f: + export = yaml.safe_load(f) + +for img_name, frame in export['images'].items(): + image_names.append(img_name) # Collect image names + + # Generate segmentations in correct format + seg_path = os.path.join(dataset_dir, "segmentations", os.path.splitext(img_name)[0] + ".png") + seg_in = cv2.imread(seg_path) + if seg_in is not None: + mask = np.zeros(seg_in.shape[:2], dtype=np.uint8) + mask += ((seg_in == (127, 127, 127)).all(axis=2)).astype(np.uint8) # Lines + mask += (((seg_in == (254, 254, 254)).all(axis=2)).astype(np.uint8) * 2) # Field + seg_out = np.zeros(seg_in.shape, dtype=np.uint8) + seg_out[..., 0] = mask + seg_out[..., 1] = mask + seg_out[..., 2] = mask + cv2.imwrite(os.path.join(masks_dir, os.path.splitext(img_name)[0] + ".png"), seg_out) + else: + print(f"No segmentation found: '{seg_path}'") + continue + + name = os.path.splitext(img_name)[0] # Remove file extension + imgwidth = frame['width'] + imgheight = frame['height'] + annotations = [] + + for annotation in frame['annotations']: + # Ignore if blurred or concealed and should be ignored + if not ((args.ignore_blurred and annotation['blurred']) or + (args.ignore_concealed and annotation['concealed'])): + + if annotation['type'] in CLASSES['bb_classes']: # Handle bounding boxes + if annotation['in_image']: + min_x = min(map(lambda x: x[0], annotation['vector'])) + max_x = max(map(lambda x: x[0], annotation['vector'])) + min_y = min(map(lambda x: x[1], annotation['vector'])) + max_y = max(map(lambda x: x[1], annotation['vector'])) + + annowidth = max_x - min_x + annoheight = max_y - min_y + relannowidth = annowidth / imgwidth + relannoheight = annoheight / imgheight + + center_x = min_x + (annowidth / 2) + center_y = min_y + (annoheight / 2) + relcenter_x = center_x / imgwidth + relcenter_y = center_y / imgheight + + if annotation['type'] != "robot": + classID = CLASSES['bb_classes'].index(annotation['type']) # Derive classID from index in predefined classes + else: + if annotation["number"] is None: + number = "None" + else: + number = str(annotation["number"]) + classID = ROBOT_COLOR_NUMBERS.index(f"{annotation['color']}_{number}") + 1 + annotations.append(f"{classID} {relcenter_x} {relcenter_y} {relannowidth} {relannoheight}") # Append to store it later + else: # Annotation is not in image + continue + elif annotation['type'] in CLASSES['segmentation_classes']: # Handle segmentations + continue + elif annotation['type'] in CLASSES['ignored_classes']: # Ignore this annotation + continue + else: + print(f"The annotation type '{annotation['type']}' is not supported or should be ignored. Image: '{img_name}'") + + # Store BB annotations in .txt file + with open(os.path.join(labels_dir, name + ".txt"), "w") as output: + output.writelines([annotation + "\n" for annotation in annotations]) + +# Create symlinks for images to destination directory +# This is helpful, if dataset directory is read-only +if create_symlinks: + for image_name in image_names: + link_path = os.path.join(images_dir, image_name) + target_path = os.path.join(dataset_dir, "images", image_name) + os.symlink(target_path, link_path) + +# Seed is used for train/test split +random.seed(args.seed) +print(f"Using seed: {args.seed}") + +# Generate train/testsplit of images +random.shuffle(sorted(image_names)) # Sort for consistent order then shuffle with seed +train_images = image_names[0:round(len(image_names) * (1 - args.testsplit))] # Split first range +test_images = image_names[round(len(image_names) * (1 - args.testsplit)) + 1:-1] # Split last range + +# Generate meta files +train_images = set(train_images) # Prevent images from showing up twice +train_path = os.path.join(destination_dir, "train.txt") +with open(train_path, "w") as train_file: + train_file.writelines([str(os.path.join(destination_dir, image_name)) + "\n" for image_name in train_images]) + +test_images = set(test_images) # Prevent images from showing up twice +test_path = os.path.join(destination_dir, "test.txt") +with open(test_path, "w") as test_file: + test_file.writelines([str(os.path.join(destination_dir, image_name)) + "\n" for image_name in test_images]) + +names_path = os.path.join(destination_dir, "yoeo.names") +with open(names_path, "w") as names_file: + names_file.writelines([class_name + "\n" for class_name in CLASSES['bb_classes']]) + +data_path = os.path.join(destination_dir, "yoeo.data") +with open(data_path, "w") as data_file: + data_file.write(f"train={train_path}\n") + data_file.write(f"valid={test_path}\n") + data_file.write(f"names={names_path}\n") diff --git a/yoeo/detect.py b/yoeo/detect.py index d223d2e..3a54472 100755 --- a/yoeo/detect.py +++ b/yoeo/detect.py @@ -18,7 +18,7 @@ from yoeo.models import load_model from yoeo.utils.class_config import ClassConfig -from yoeo.utils.dataclasses import ClassNames, SqueezeConfig +from yoeo.utils.dataclasses import ClassNames, GroupConfig from yoeo.utils.utils import rescale_boxes, non_max_suppression, print_environment_info, rescale_segmentation from yoeo.utils.datasets import ImageFolder from yoeo.utils.transforms import Resize, DEFAULT_TRANSFORMS @@ -30,7 +30,7 @@ def detect_directory(model_path, weights_path, img_path, class_config: ClassConfig, output_path, batch_size=8, img_size=416, n_cpu=8, conf_thres=0.5, nms_thres=0.5, - robot_class_ids: Optional[List[int]] = None): + ): """Detects objects on all images in specified directory and saves output images with drawn detections. :param model_path: Path to model definition file (.cfg) @@ -53,8 +53,6 @@ def detect_directory(model_path, weights_path, img_path, class_config: ClassConf :type conf_thres: float, optional :param nms_thres: IOU threshold for non-maximum suppression, defaults to 0.5 :type nms_thres: float, optional - :param robot_class_ids: List of class IDs of robot classes if multiple robot classes exist. - :type robot_class_ids: List[int], optional """ dataloader = _create_data_loader(img_path, batch_size, img_size, n_cpu) model = load_model(model_path, weights_path) @@ -65,7 +63,7 @@ def detect_directory(model_path, weights_path, img_path, class_config: ClassConf output_path, conf_thres, nms_thres, - class_config.get_squeeze_config() + class_config.get_group_config() ) _draw_and_save_output_images( img_detections, segmentations, imgs, img_size, output_path, class_config.get_unsqueezed_det_class_names()) @@ -78,7 +76,7 @@ def detect_image(model, img_size: int = 416, conf_thres: float = 0.5, nms_thres: float = 0.5, - squeeze_config: Optional[SqueezeConfig] = None + group_config: Optional[GroupConfig] = None ): """Inferences one image with model. @@ -92,8 +90,8 @@ def detect_image(model, :type conf_thres: float :param nms_thres: IOU threshold for non-maximum suppression, defaults to 0.5 :type nms_thres: float - :param squeeze_config: SqueezeConfiguration for this model (optional, defaults to None) - :type squeeze_config: Optional[SqueezeConfig] + :param group_config: GroupConfiguration for this model (optional, defaults to None) + :type group_config: Optional[GroupConfig] :return: Detections on image with each detection in the format: [x1, y1, x2, y2, confidence, class], Segmentation as 2d numpy array with the coresponding class id in each cell :rtype: nd.array, nd.array @@ -118,7 +116,7 @@ def detect_image(model, prediction=detections, conf_thres=conf_thres, iou_thres=nms_thres, - squeeze_config=squeeze_config + group_config=group_config ) detections = rescale_boxes(detections[0], img_size, image.shape[0:2]) segmentations = rescale_segmentation(segmentations, image.shape[0:2]) @@ -130,7 +128,7 @@ def detect(model, output_path: str, conf_thres: float = 0.5, nms_thres: float = 0.5, - squeeze_config: Optional[SqueezeConfig] = None + group_config: Optional[GroupConfig] = None ): """Inferences images with model. @@ -144,8 +142,8 @@ def detect(model, :type conf_thres: float :param nms_thres: IOU threshold for non-maximum suppression, defaults to 0.5 :type nms_thres: float - :param squeeze_config: SqueezeConfiguration for this model (optional, defaults to None) - :type squeeze_config: Optional[SqueezeConfig] + :param group_config: GroupConfig for this model (optional, defaults to None) + :type group_config: Optional[GroupConfig] :return: List of detections. The coordinates are given for the padded image that is provided by the dataloader. Use `utils.rescale_boxes` to transform them into the desired input image coordinate system before its transformed by the dataloader), @@ -174,7 +172,7 @@ def detect(model, prediction=detections, conf_thres=conf_thres, iou_thres=nms_thres, - squeeze_config=squeeze_config + group_config=group_config ) # Store image and detections diff --git a/yoeo/test.py b/yoeo/test.py index d93d1b7..aa5cb5b 100755 --- a/yoeo/test.py +++ b/yoeo/test.py @@ -93,7 +93,7 @@ def print_eval_stats(metrics_output: Optional[Tuple[np.ndarray]], mbACC = secondary_metric.mbACC() if verbose: - classes = class_config.get_squeeze_class_names() + classes = class_config.get_group_class_names() mbACC_per_class = [secondary_metric.bACC(i) for i in range(len(classes))] sec_table = [["Index", "Class", "bACC"]] @@ -176,14 +176,14 @@ def _evaluate(model, dataloader, class_config, img_size, iou_thres, conf_thres, yolo_outputs, conf_thres=conf_thres, iou_thres=nms_thres, - squeeze_config=class_config.get_squeeze_config() + group_config=class_config.get_squeeze_config() ) sample_stat, secondary_stat = get_batch_statistics( yolo_outputs, bb_targets, iou_threshold=iou_thres, - squeeze_config=class_config.get_squeeze_config() + group_config=class_config.get_squeeze_config() ) sample_metrics += sample_stat diff --git a/yoeo/utils/class_config.py b/yoeo/utils/class_config.py index c16d303..3315726 100644 --- a/yoeo/utils/class_config.py +++ b/yoeo/utils/class_config.py @@ -4,7 +4,7 @@ from typing import Dict, List, Any, Optional -from yoeo.utils.dataclasses import ClassNames, SqueezeConfig +from yoeo.utils.dataclasses import ClassNames, GroupConfig class ClassConfig: @@ -12,107 +12,107 @@ def __init__(self, content: Dict[Any, Any], class_names: ClassNames): self._det_class_names: List[str] = class_names.detection self._seg_class_names: List[str] = class_names.segmentation - self._class_names_to_squeeze: List[str] = content["squeeze_classes"] - self._squeeze_surrogate_name: Optional[str] = content["surrogate_class"] + self._class_names_to_group: List[str] = content["group_classes"] + self._group_surrogate_name: Optional[str] = content["surrogate_class"] - self._ids_to_squeeze: Optional[List[int]] = self._compute_squeeze_ids() - self._squeezed_det_class_names: List[str] = self._squeeze_class_names() + self._ids_to_group: Optional[List[int]] = self._compute_group_ids() + self._grouped_det_class_names: List[str] = self._group_class_names() - def _compute_squeeze_ids(self) -> Optional[List[int]]: + def _compute_group_ids(self) -> Optional[List[int]]: """ - Given the list of detection class names and the list of class names that should be squeezed into one class, + Given the list of detection class names and the list of class names that should be grouped into one class, compute the ids of the latter classes, i.e. their position in the list of detection class names. - :return: The ids of all class names that should be squeezed into one class if there are any. None otherwise. + :return: The ids of all class names that should be grouped into one class if there are any. None otherwise. :rtype: Optional[List[int]] """ squeeze_ids = None - if self._class_names_to_squeeze: + if self._class_names_to_group: squeeze_ids = [] for idx, class_name in enumerate(self._det_class_names): - if class_name in self._class_names_to_squeeze: + if class_name in self._class_names_to_group: squeeze_ids.append(idx) return squeeze_ids - def _squeeze_class_names(self) -> List[str]: + def _group_class_names(self) -> List[str]: """ - Given the list of detection class names and the list of class names that should be squeezed into one class, + Given the list of detection class names and the list of class names that should be grouped into one class, compute a new list of class names in which all of the latter class names are removed and the surrogate class - name is inserted at the position of the first class of the classes that should be squeezed. + name is inserted at the position of the first class of the classes that should be grouped. - :return: A list of class names in which all class names that should be squeezed are removed and the surrogate + :return: A list of class names in which all class names that should be grouped are removed and the surrogate class name is inserted as a surrogate for those classes :rtype: List[str] """ # Copy the list of detection class names - squeezed_class_names = list(self._det_class_names) + grouped_class_names = list(self._det_class_names) - if self._class_names_to_squeeze: - # Insert the surrogate class name before the first to be squeezed class name - squeezed_class_names.insert(self.get_surrogate_id(), self._squeeze_surrogate_name) + if self._class_names_to_group: + # Insert the surrogate class name before the first to be grouped class name + grouped_class_names.insert(self.get_surrogate_id(), self._group_surrogate_name) - # Remove all to be squeezed class names - for name in self._class_names_to_squeeze: - squeezed_class_names.remove(name) + # Remove all to be grouped class names + for name in self._class_names_to_group: + grouped_class_names.remove(name) - return squeezed_class_names + return grouped_class_names - def get_squeeze_config(self) -> Optional[SqueezeConfig]: + def get_group_config(self) -> Optional[GroupConfig]: """ - Get the current 'SqueezeConfig'. + Get the current 'GroupConfig'. - :return: The current 'SqueezeConfig' if neither 'self.get_squeeze_ids()' nor 'self.get_surrogate_id()' is + :return: The current 'GroupConfig' if neither 'self.get_group_ids()' nor 'self.get_surrogate_id()' is 'None'. Return 'None' otherwise. - :rtype: Optional[SqueezeConfig] + :rtype: Optional[GroupConfig] """ - squeeze_ids = self.get_squeeze_ids() + group_ids = self.get_squeeze_ids() surrogate_id = self.get_surrogate_id() - if squeeze_ids is None or surrogate_id is None: + if group_ids is None or surrogate_id is None: return None else: - return SqueezeConfig(squeeze_ids, surrogate_id) + return GroupConfig(group_ids, surrogate_id) - def get_squeeze_class_names(self) -> List[str]: + def get_group_class_names(self) -> List[str]: """ - Get the class names of the classes that should be squeezed together during evaluation + Get the class names of the classes that should be grouped together during evaluation - :return: a list of class names that should be squeezed together during evaluation + :return: a list of class names that should be grouped together during evaluation :rtype: List[str] """ - return self._class_names_to_squeeze + return self._class_names_to_group def get_surrogate_id(self) -> Optional[int]: """ - Get the id of the surrogate class in the list of squeezed class names. If there are no classes to squeezed, + Get the id of the surrogate class in the list of grouped class names. If there are no classes to be grouped, None is returned. - :return: The id of the surrogate class in the list of squeezed class names if there are classes that should be - squeezed. None otherwise. + :return: The id of the surrogate class in the list of grouped class names if there are classes that should be + grouped. None otherwise. :rtype: Optional[int] """ - return None if not self._ids_to_squeeze else self._ids_to_squeeze[0] + return None if not self._ids_to_group else self._ids_to_group[0] - def get_squeezed_det_class_names(self) -> List[str]: + def get_grouped_det_class_names(self) -> List[str]: """ - Get the squeezed list of detection class names. + Get the grouped list of detection class names. - :return: The squeezed list of detection class names. + :return: The grouped list of detection class names. :rtype: List[str] """ - return self._squeezed_det_class_names + return self._grouped_det_class_names - def get_unsqueezed_det_class_names(self) -> List[str]: + def get_ungrouped_det_class_names(self) -> List[str]: """ - Get the unsqueezed list of detection class names. + Get the ungrouped list of detection class names. - :return: The unsqueezed list of detection class names. + :return: The ungrouped list of detection class names. :rtype: List[str] """ @@ -130,49 +130,49 @@ def get_seg_class_names(self) -> List[str]: def get_squeeze_ids(self) -> Optional[List[int]]: """ - Get the (unsqueezed) ids of the class names that should be squeezed into one class. + Get the (ungrouped) ids of the class names that should be grouped into one class. - :return: A list of unsqueezed ids for the class names that should be squeezed into one class if there are any. + :return: A list of ungrouped ids for the class names that should be grouped into one class if there are any. None otherwise :rtype: Optional[List[int]] """ - return self._ids_to_squeeze + return self._ids_to_group def get_surrogate_name(self) -> Optional[str]: """ - Get the class name of the surrogate class if there are classes that should be squeezed into one class. Return + Get the class name of the surrogate class if there are classes that should be grouped into one class. Return None otherwise. - :return: The name of the surrogate class if there are classes that should be squeezed into one class. None + :return: The name of the surrogate class if there are classes that should be grouped into one class. None otherwise. :rtype: Optional[List[str]] """ - return self._squeeze_surrogate_name + return self._group_surrogate_name - def classes_should_be_squeezed(self) -> bool: + def classes_should_be_grouped(self) -> bool: """ - Return true if there are classes that should be squeezed into one class. Return false otherwise. + Return true if there are classes that should be grouped into one class. Return false otherwise. - :return: true if there are classes that should be squeezed into on class. False otherwise. + :return: true if there are classes that should be grouped into on class. False otherwise. :rtype: bool """ - return self._ids_to_squeeze is not None + return self._ids_to_group is not None - def squeeze(self, labels: List[int]) -> List[int]: + def group(self, labels: List[int]) -> List[int]: """ - Squeeze a list of class ids. Given a set of classes that should be squeezed X, replace all class ids in X by + Group a list of class ids. Given a set of classes that should be grouped X, replace all class ids in X by the surrogate id. - :param labels: list of class ids to squeeze. + :param labels: list of class ids to group. :type labels: List[int] - :return: squeezed list of class ids where + :return: grouped list of class ids where :rtype: List[int] """ surrogate_id = self.get_surrogate_id() - return [label if label not in self._ids_to_squeeze else surrogate_id for label in labels] + return [label if label not in self._ids_to_group else surrogate_id for label in labels] @classmethod def load_from(cls, path: str, class_names: ClassNames) -> ClassConfig: diff --git a/yoeo/utils/dataclasses.py b/yoeo/utils/dataclasses.py index 3d2590b..c2973cd 100644 --- a/yoeo/utils/dataclasses.py +++ b/yoeo/utils/dataclasses.py @@ -29,6 +29,6 @@ def _read_yaml_file(path: str) -> Dict[Any, Any]: @dataclass -class SqueezeConfig: - squeeze_ids: List[int] +class GroupConfig: + group_ids: List[int] surrogate_id: int diff --git a/yoeo/utils/utils.py b/yoeo/utils/utils.py index c526694..de62950 100644 --- a/yoeo/utils/utils.py +++ b/yoeo/utils/utils.py @@ -11,7 +11,7 @@ import random from typing import List, Optional, Tuple -from yoeo.utils.dataclasses import SqueezeConfig +from yoeo.utils.dataclasses import GroupConfig from yoeo.utils.metric import Metric @@ -291,26 +291,26 @@ def compute_ap(recall, precision): def get_batch_statistics(outputs, targets, iou_threshold, - squeeze_config: Optional[SqueezeConfig] = None + group_config: Optional[GroupConfig] = None ) -> Tuple[List, Optional[Metric]]: """ - Calculcate the batch statistics. If 'squeeze_config' is not 'None', the contained classes will be squeezed into one - class ('SqueezeConfig.surrogate_id') for batch statistics evaluation and evalutated separately on a secondary class - label. The statistics for the latter are returned as a 'Metric' object. If 'squeeze_config' is None, no 'Metric' + Calculcate the batch statistics. If 'group_config' is not 'None', the contained classes will be grouped into one + class ('GroupConfig.surrogate_id') for batch statistics evaluation and evaluated separately on a secondary class + label. The statistics for the latter are returned as a 'Metric' object. If 'group_config' is None, no 'Metric' object will be returned and the tuple will simply contain 'None' at the respective position. :return: The batch statistics, as well as an optional Metric object for the secondary class argument if - 'squeeze_config' is not None + 'group_config' is not None :rtype: Tuple[List, Optional[Metric]] """ """ Compute true positives, predicted scores and predicted labels per sample """ batch_metrics = [] - squeeze_active: bool = squeeze_config is not None + grouping_active: bool = group_config is not None - if squeeze_active: - secondary_metric = Metric(len(squeeze_config.squeeze_ids)) - squeeze_ids = torch.tensor(squeeze_config.squeeze_ids) + if grouping_active: + secondary_metric = Metric(len(group_config.group_ids)) + group_ids = torch.tensor(group_config.group_ids) else: secondary_metric = None @@ -323,18 +323,18 @@ class ('SqueezeConfig.surrogate_id') for batch statistics evaluation and evaluta pred_scores = output[:, 4] pred_labels = output[:, -1] - if squeeze_active: - sec_pred_labels = compute_secondary_labels(pred_labels, squeeze_ids) - pred_labels = squeeze_primary_labels(pred_labels, squeeze_ids, squeeze_config.surrogate_id) + if grouping_active: + sec_pred_labels = compute_secondary_labels(pred_labels, group_ids) + pred_labels = group_primary_labels(pred_labels, group_ids, group_config.surrogate_id) true_positives = np.zeros(pred_boxes.shape[0]) annotations = targets[targets[:, 0] == sample_i][:, 1:] target_labels = annotations[:, 0] if len(annotations) else [] - if squeeze_active and type(target_labels) is not list: - sec_target_labels = compute_secondary_labels(target_labels, squeeze_ids) - target_labels = squeeze_primary_labels(target_labels, squeeze_ids, squeeze_config.surrogate_id) + if grouping_active and type(target_labels) is not list: + sec_target_labels = compute_secondary_labels(target_labels, group_ids) + target_labels = group_primary_labels(target_labels, group_ids, group_config.surrogate_id) if len(annotations): detected_boxes = [] @@ -364,33 +364,33 @@ class ('SqueezeConfig.surrogate_id') for batch statistics evaluation and evaluta true_positives[pred_i] = 1 detected_boxes += [box_index] - if squeeze_active: + if grouping_active: sec_pred_label = sec_pred_labels[pred_i] - if pred_label in squeeze_ids: + if pred_label in group_ids: secondary_metric.update(sec_pred_label.int(), sec_target_labels[box_index].int()) batch_metrics.append([true_positives, pred_scores, pred_labels]) return batch_metrics, secondary_metric -def compute_secondary_labels(labels: torch.tensor, squeeze_ids: torch.tensor) -> torch.tensor: +def compute_secondary_labels(labels: torch.tensor, group_ids: torch.tensor) -> torch.tensor: secondary_labels = labels.clone() - # We replace the actual class labels with values from {0, ...} for classes that should be squeezed into a + # We replace the actual class labels with values from {0, ...} for classes that should be grouped into a # single class. All other classes get the label -1. - for idx, squeeze_id in enumerate(squeeze_ids): + for idx, squeeze_id in enumerate(group_ids): # Replace label with value in {0, ...} secondary_labels[labels == squeeze_id] = idx # Replace all other labels with -1 - secondary_labels[torch.logical_not(torch.isin(labels, squeeze_ids))] = -1 + secondary_labels[torch.logical_not(torch.isin(labels, group_ids))] = -1 return secondary_labels -def squeeze_primary_labels(labels: torch.tensor, squeeze_ids: torch.tensor, surrogate_id: int) -> torch.tesor: - # Replace all primary labels that are contained in squeeze_ids with the surrogate_id - labels[torch.isin(labels, squeeze_ids)] = surrogate_id +def group_primary_labels(labels: torch.tensor, group_ids: torch.tensor, surrogate_id: int) -> torch.tesor: + # Replace all primary labels that are contained in group_ids with the surrogate_id + labels[torch.isin(labels, group_ids)] = surrogate_id return labels @@ -467,10 +467,10 @@ def box_area(box): def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, - squeeze_config: Optional[SqueezeConfig] = None): + group_config: Optional[GroupConfig] = None): """ - Performs Non-Maximum Suppression (NMS) on inference results. If 'squeeze_config' is not 'None', the contained - classes will be treated as one class ('SqueezeConfig.surrogate_id') during non-maximum supression. + Performs Non-Maximum Suppression (NMS) on inference results. If 'group_config' is not 'None', the contained + classes will be treated as one class ('GroupConfig.surrogate_id') during non-maximum supression. Returns: detections with shape: nx6 (x1, y1, x2, y2, conf, cls) @@ -488,8 +488,8 @@ def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=Non t = time.time() output = [torch.zeros((0, 6), device="cpu")] * prediction.shape[0] - if squeeze_config: - squeeze_ids = torch.tensor(squeeze_config.squeeze_ids, device=prediction.device, dtype=prediction.dtype) + if group_config: + group_ids = torch.tensor(group_config.group_ids, device=prediction.device, dtype=prediction.dtype) for xi, x in enumerate(prediction): # image index, image inference # Apply constraints @@ -527,13 +527,13 @@ def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=Non x = x[x[:, 4].argsort(descending=True)[:max_nms]] # Batched NMS - if squeeze_config is None: + if group_config is None: c = x[:, 5:6] * max_wh # classes else: # If for example multiple robot classes are present, all robot classes are treated as one class in order # to perform nms across all classes and not per class. For this, all robot classes get the same offset. c = torch.clone(x[:, 5:6]) - c[torch.isin(c, squeeze_ids)] = squeeze_config.surrogate_id + c[torch.isin(c, group_ids)] = group_config.surrogate_id c *= max_wh # boxes (offset by class), scores