diff --git a/image_recognition_face_recognition/package.xml b/image_recognition_face_recognition/package.xml
index 403ad436..24baecfa 100644
--- a/image_recognition_face_recognition/package.xml
+++ b/image_recognition_face_recognition/package.xml
@@ -20,9 +20,11 @@
diagnostic_updater
image_recognition_msgs
image_recognition_util
+ python3-facenet-pytorch-pip
python3-numpy
python3-opencv
rospy
+ std_srvs
catkin_lint_cmake
diff --git a/image_recognition_face_recognition/scripts/face_recognition_node b/image_recognition_face_recognition/scripts/face_recognition_node
index eee211ad..57165c4d 100755
--- a/image_recognition_face_recognition/scripts/face_recognition_node
+++ b/image_recognition_face_recognition/scripts/face_recognition_node
@@ -2,19 +2,17 @@
import math
import os
import sys
+from typing import TYPE_CHECKING
+
import diagnostic_updater
import rospy
from cv_bridge import CvBridge, CvBridgeError
-from sensor_msgs.msg import RegionOfInterest, Image
-from std_srvs.srv import Empty
-from image_recognition_msgs.msg import (
- Recognition,
- Recognitions,
- CategoryProbability,
- CategoricalDistribution,
-)
-from image_recognition_msgs.srv import Recognize, Annotate
+from image_recognition_msgs.msg import CategoricalDistribution, CategoryProbability, Recognition, Recognitions
+from image_recognition_msgs.srv import Annotate, Recognize
from image_recognition_util import image_writer
+from sensor_msgs.msg import Image, RegionOfInterest
+from std_srvs.srv import Empty, EmptyRequest, EmptyResponse
+
from image_recognition_face_recognition.face_recognizer import FaceRecognizer
@@ -48,6 +46,8 @@ class OpenfaceROS:
self._bridge = CvBridge()
self._annotate_srv = rospy.Service("annotate", Annotate, self._annotate_srv)
self._recognize_srv = rospy.Service("recognize", Recognize, self._recognize_srv)
+ self._clear_srv = rospy.Service("clear", Empty, self._clear_srv)
+
self._image_subscriber = rospy.Subscriber("image", Image, self._image_callback)
self._recognitions_publisher = rospy.Publisher("recognitions", Recognitions, queue_size=10)
@@ -80,8 +80,8 @@ class OpenfaceROS:
for annotation in req.annotations:
roi_image = bgr_image[
- annotation.roi.y_offset: annotation.roi.y_offset + annotation.roi.height,
- annotation.roi.x_offset: annotation.roi.x_offset + annotation.roi.width,
+ annotation.roi.y_offset : annotation.roi.y_offset + annotation.roi.height,
+ annotation.roi.x_offset : annotation.roi.x_offset + annotation.roi.width,
]
if self._save_images_folder:
@@ -96,18 +96,25 @@ class OpenfaceROS:
return {}
- def _get_recognitions(self, image_msg, save_images, publish_images):
+ def _clear_srv(self, _: EmptyRequest):
+ """
+ Service to clear the trained faces
+ """
+ self._face_recognizer.clear_trained_faces()
+ return EmptyResponse()
+
+ def _get_recognitions(self, image_msg: Image, save_images: bool, publish_images: bool):
# Convert to opencv image
"""
Recognize service callback
- :param req: The input image
+ :param image_msg: The input image
:return: Recognitions
"""
try:
bgr_image = self._bridge.imgmsg_to_cv2(image_msg, "bgr8")
except CvBridgeError as e:
- raise Exception("Could not convert to opencv image: %s" % str(e))
+ raise Exception(f"Could not convert to opencv image: {e}")
# Write raw image
if save_images:
@@ -115,26 +122,27 @@ class OpenfaceROS:
images = []
labels = []
-
- # Call facebet neural network in two stages
- face_recognitions = self._face_recognizer.face_detection(bgr_image)
+
+ # Call facenet neural network in two stages
+ face_recognitions = self._face_recognizer.detect(bgr_image)
distance, labels_pred = self._face_recognizer.detection_recognition(bgr_image, labels, train=True)
-
+
# Fill recognitions
recognitions = []
# rospy.loginfo("Face recognitions: %s", face_recognitions)
label_idx = 0
- for fr in face_recognitions:
+ for fr in face_recognitions:
face_recognition = [math.floor(xi) for xi in fr]
if save_images:
label = labels_pred[label_idx]
roi_image = bgr_image[
- face_recognition[2]: face_recognition[3],
- face_recognition[0]: face_recognition[1],
+ face_recognition[2] : face_recognition[3],
+ face_recognition[0] : face_recognition[1],
]
image_writer.write_annotated(self._save_images_folder, roi_image, label, False)
+
images.append(roi_image)
labels.append(label)
label = labels_pred[label_idx]
@@ -144,15 +152,16 @@ class OpenfaceROS:
categorical_distribution=CategoricalDistribution(
unknown_probability=0.0, # TODO: When is it unknown?
probabilities=[
- # This line needs some changing
- CategoryProbability(label=label, probability=1.0 / (distance_fr + 0.001)) for l2 in face_recognition
+ # This line needs some changing
+ CategoryProbability(label=label, probability=1.0 / (distance_fr + 0.001))
+ for l2 in face_recognition
],
),
roi=RegionOfInterest(
- x_offset=face_recognition[0],
- y_offset=face_recognition[1],
- width=face_recognition[2] - face_recognition[0],
- height=face_recognition[3] - face_recognition[1],
+ x_offset=face_recognition.roi.x_offset,
+ y_offset=face_recognition.roi.y_offset,
+ width=face_recognition.roi.width,
+ height=face_recognition.roi.height,
),
)
)
@@ -175,7 +184,7 @@ class OpenfaceROS:
return recognitions
def _image_callback(self, image_msg):
- # Comment this exception for beeter debbuging
+ # Comment this exception for better debbuging
try:
recognitions = self._get_recognitions(
image_msg,
@@ -208,9 +217,15 @@ if __name__ == "__main__":
topic_publish_result_image = rospy.get_param("~topic_publish_result_image", True)
service_publish_result_image = rospy.get_param("~service_publish_result_image", True)
+ db = rospy.get_param("~db", None)
+ if db:
+ db = os.path.expanduser(db)
+
save_images_folder = None
if save_images:
- save_images_folder = os.path.expanduser(rospy.get_param("~save_images_folder", "/tmp/facenet_saved_images"))
+ save_images_folder = os.path.expanduser(
+ rospy.get_param("~save_images_folder", "/tmp/face_recognition_saved_images")
+ )
except KeyError as e:
rospy.logerr("Parameter %s not found" % e)
sys.exit(1)
@@ -222,6 +237,11 @@ if __name__ == "__main__":
topic_publish_result_image,
service_publish_result_image,
)
+
+ if db:
+ rospy.loginfo(f"loading face database from {db}")
+ image_recognition_openface.restore_trained_faces(db)
+
updater = diagnostic_updater.Updater()
updater.setHardwareID("none")
updater.add(diagnostic_updater.Heartbeat())
diff --git a/image_recognition_face_recognition/scripts/get_face_recognition b/image_recognition_face_recognition/scripts/get_face_recognition
index eb459ef6..e7d6a993 100755
--- a/image_recognition_face_recognition/scripts/get_face_recognition
+++ b/image_recognition_face_recognition/scripts/get_face_recognition
@@ -1,14 +1,15 @@
#!/usr/bin/env python
import math
-from typing import Optional
+from typing import List, Optional
import cv2
-from image_recognition_face_recognition.face_recognizer import FaceRecognizer
from image_recognition_msgs.msg import Recognition
from image_recognition_util import image_writer
from sensor_msgs.msg import RegionOfInterest
+from image_recognition_face_recognition.face_recognizer import FaceRecognizer, RecognizedFace
+
def main(image, db: Optional[str] = None):
# Read the image
@@ -20,26 +21,21 @@ def main(image, db: Optional[str] = None):
if db:
face_recognizer.restore_trained_faces(db)
- recognized_faces = face_recognizer.face_detection(img)
- print(recognized_faces)
+ recognized_faces: List[RecognizedFace] = face_recognizer.detect(img)
recognitions = []
for fr in recognized_faces:
- face_recognition = [math.floor(xi) for xi in fr]
recognitions.append(
Recognition(
roi=RegionOfInterest(
- x_offset=face_recognition[0],
- y_offset=face_recognition[1],
- width=face_recognition[2] - face_recognition[0],
- height=face_recognition[3] - face_recognition[1],
+ x_offset=fr.roi.x_offset, y_offset=fr.roi.y_offset, width=fr.roi.width, height=fr.roi.height
)
)
)
annotated_original_image = image_writer.get_annotated_cv_image(img, recognitions)
cv2.imshow("result", annotated_original_image)
- cv2.waitKey(1000)
+ cv2.waitKey()
if __name__ == "__main__":
diff --git a/image_recognition_face_recognition/scripts/train_from_images b/image_recognition_face_recognition/scripts/train_from_images
index 55144a2b..907e5f18 100755
--- a/image_recognition_face_recognition/scripts/train_from_images
+++ b/image_recognition_face_recognition/scripts/train_from_images
@@ -1,10 +1,11 @@
#!/usr/bin/env python
+import logging
import os
+from argparse import Action, ArgumentParser, ArgumentTypeError, FileType, Namespace
+
import cv2
-import logging
-from argparse import ArgumentParser, Action, ArgumentTypeError, FileType
-from image_recognition_face_recognition.face_recognizer import FaceRecognizer
+from image_recognition_face_recognition.face_recognizer import FaceRecognizer
logging.basicConfig()
logger = logging.getLogger(__name__)
@@ -20,30 +21,30 @@ def main(modeldir, outfile, verbose: bool = False):
dirs = [o for o in os.listdir(modeldir) if os.path.isdir(os.path.join(modeldir, o))]
for cat in dirs:
- logger.info('loading images for %s', cat)
+ logger.info("loading images for %s", cat)
path = os.path.join(modeldir, cat)
- logger.debug('loading images from %s', path)
+ logger.debug("loading images from %s", path)
for filename in os.listdir(path):
f = os.path.join(path, filename)
- logger.debug('processing %s', f)
+ logger.debug("processing %s", f)
img = cv2.imread(f, 1) # load as color
try:
- logger.debug('training...')
+ logger.debug("training...")
face_recognizer.train(img, cat)
- logger.debug('success!')
+ logger.debug("success!")
except Exception:
- logger.exception('face_recognizer failed to process %s', f)
+ logger.exception("face_recognizer failed to process %s", f)
- logger.info('saving database...')
+ logger.info("saving database...")
face_recognizer.save_trained_faces(outfile)
- logger.info('done!')
+ logger.info("done!")
class ReadableDir(Action):
- def __call__(self, parser, namespace, values, option_string=None):
+ def __call__(self, parser: ArgumentParser, namespace: Namespace, values, option_string=None):
prospective_dir = values
if not os.path.isdir(prospective_dir):
raise ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir))
@@ -53,13 +54,13 @@ class ReadableDir(Action):
raise ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir))
-if __name__ == '__main__':
- parser = ArgumentParser(description='Train openface from a database of images')
+if __name__ == "__main__":
+ parser = ArgumentParser(description="Train openface from a database of images")
- parser.add_argument('modeldir', action=ReadableDir, help='Directory with folders for each category')
- parser.add_argument('outfile', type=FileType('wb'), help='Where to output the trained faces database')
+ parser.add_argument("modeldir", action=ReadableDir, help="Directory with folders for each category")
+ parser.add_argument("outfile", type=FileType("wb"), help="Where to output the trained faces database")
- parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args()
main(**vars(args))
diff --git a/image_recognition_face_recognition/src/image_recognition_face_recognition/__init__.py b/image_recognition_face_recognition/src/image_recognition_face_recognition/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/image_recognition_face_recognition/src/image_recognition_face_recognition/face_recognizer.py b/image_recognition_face_recognition/src/image_recognition_face_recognition/face_recognizer.py
index 2ea45c9f..86e3717b 100644
--- a/image_recognition_face_recognition/src/image_recognition_face_recognition/face_recognizer.py
+++ b/image_recognition_face_recognition/src/image_recognition_face_recognition/face_recognizer.py
@@ -1,34 +1,152 @@
-from typing import List, Tuple, Union
-from facenet_pytorch import MTCNN, InceptionResnetV1
+from __future__ import annotations
+
import pickle
+from dataclasses import dataclass, field
+from os import PathLike
+from pathlib import Path
+from typing import List, Tuple
+
+import numpy as np
import rospy
import torch
-import numpy as np
+from torchvision.transforms import functional as F
+from facenet_pytorch import MTCNN, InceptionResnetV1
+from facenet_pytorch.models.utils.detect_face import get_size, crop_resize, save_img
-class TrainedFace:
+@dataclass
+class ROI:
"""
- This class serves as a custom struct to store information of people we recognized
+ ROI class that holds a 'region of interest' of an image
"""
- def __init__(self, label):
- """
- A custom struct to store the names and the embedded representations (tensors) of people
- """
- self.label = label
- self.representations = []
+ x_offset: int
+ y_offset: int
+ width: int
+ height: int
+
+#
+# def _get_roi_image(bgr_image: np.ndarray, bb: np.ndarray, factor_x: float, factor_y: float) -> Tuple[np.ndarray, ROI]:
+# """Get the image roi and the roi of a face detection
+#
+# :param bgr_image: Input image
+# :param bb: bb detection
+# :param factor_x: scale-up factor x
+# :param factor_y: scale-up factor y
+# :return: roi_image, roi
+# """
+# # fastest round to an int
+# min_x = int(bb[0] + 0.5)
+# min_y = int(bb[1] + 0.5)
+# max_x = int(bb[2] + 0.5)
+# max_y = int(bb[3] + 0.5)
+#
+# dx = max_x - min_x
+# dy = max_y - min_y
+#
+# padding_x = int(factor_x * dx)
+# padding_y = int(factor_y * dy)
+#
+# # Don't go out of bound
+# y_offset = max(0, min_y - padding_y)
+# height = min(max_y + padding_y, bgr_image.shape[0]) - y_offset
+# x_offset = max(0, min_x - padding_x)
+# width = min(max_x + padding_x, bgr_image.shape[1]) - x_offset
+# roi = ROI(x_offset, y_offset, width, height)
+#
+# return bgr_image[min_y : min_y + height, min_x : min_x + width], roi
+
+
+def extract_face(img, box, image_size=160, margin=0, save_path=None) -> tuple[torch.Tensor, list[int]]:
+ """Extract face + margin from PIL Image given bounding box.
+
+ Arguments:
+ img {PIL.Image} -- A PIL Image.
+ box {numpy.ndarray} -- Four-element bounding box.
+ image_size {int} -- Output image size in pixels. The image will be square.
+ margin {int} -- Margin to add to bounding box, in terms of pixels in the final image.
+ Note that the application of the margin differs slightly from the davidsandberg/facenet
+ repo, which applies the margin to the original image before resizing, making the margin
+ dependent on the original image size.
+ save_path {str} -- Save path for extracted face image. (default: {None})
+
+ Returns:
+ torch.tensor -- tensor representing the extracted face.
+ list -- list of ROI corners
+ """
+ margin = [
+ margin * (box[2] - box[0]) / (image_size - margin),
+ margin * (box[3] - box[1]) / (image_size - margin),
+ ]
+ raw_image_size = get_size(img)
+ box = [
+ int(max(box[0] - margin[0] / 2, 0)),
+ int(max(box[1] - margin[1] / 2, 0)),
+ int(min(box[2] + margin[0] / 2, raw_image_size[0])),
+ int(min(box[3] + margin[1] / 2, raw_image_size[1])),
+ ]
- def get_label(self) -> str:
- """
- A getter for the labels of the struct
- """
- return self.label
+ face = crop_resize(img, box, image_size)
- def get_representations(self):
+ if save_path is not None:
+ Path(save_path).parent.mkdir(parents=True, exist_ok=True)
+ save_img(face, save_path)
+
+ face = F.to_tensor(np.float32(face))
+
+ return face, box
+
+
+@dataclass
+class L2Distance:
+ """
+ L2 Distance that holds a l2 distance and an associated label
+
+ Args:
+ distance: the l2 distance
+ label: the label
+ """
+
+ distance: float
+ label: str
+
+
+class RecognizedFace:
+ def __init__(
+ self,
+ image: torch.Tensor,
+ roi: ROI,
+ ):
"""
- A getter for the embeddings of the struct
+ A Recognized face in an image
+
+ :param image: The original image
+ :param roi: Region of Interest
"""
- return self.representations
+ self._image = image
+ self._roi: ROI = roi
+ self.l2_distances: list[L2Distance] = []
+
+ @property
+ def image(self) -> np.ndarray:
+ return self._image.permute(1, 2, 0).detach().cpu().numpy()
+
+ @property
+ def roi(self) -> ROI:
+ return self._roi
+
+ def __repr__(self):
+ return f"RecognizedFace(image.shape={self.image.shape}, l2_distances={self.l2_distances})"
+
+
+@dataclass
+class TrainedFace:
+ """
+ This class serves as a custom struct to store information of people we recognized
+ """
+
+ label: str
+ representations: List[torch.Tensor] = field(default_factory=list)
class FaceRecognizer:
@@ -40,11 +158,10 @@ def __init__(self):
"""
Constructor for the list which contains the TrainedFace structure
"""
- self._trained_faces = []
- self.device = torch.device(
- "cuda:0" if torch.cuda.is_available() else "cpu")
- rospy.loginfo(f"Running on device: {self.device}")
- self.mtcnn = MTCNN(
+ self._trained_faces: list[TrainedFace] = []
+ self._device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
+ rospy.loginfo(f"Running on device: {self._device}")
+ self._mtcnn = MTCNN(
keep_all=True,
image_size=160,
margin=0,
@@ -52,31 +169,87 @@ def __init__(self):
thresholds=[0.6, 0.7, 0.7],
factor=0.709,
post_process=True,
- device=self.device,
+ device=self._device,
)
+ self._resnet = InceptionResnetV1(pretrained="vggface2", device=self._device).eval()
- def face_detection(self, img: np.ndarray) -> List[Union[Tuple[float, float, float, float], None]]:
+ def _update_with_categorical_distribution(self, recognition: RecognizedFace) -> RecognizedFace:
"""
- Returns the index of the trained face
+ Update the recognition with a categorical distribution of the trained faces
+
+ :param recognition: Input recognition
+ :return: Output recognition with an updated categorical distribution
+ """
+ if self._trained_faces:
+ # Try to get a representation of the detected face
+ recognition_embedding = None
+ try:
+ recognition_embedding = self._get_embedding(torch.stack([recognition._image], dim=0)).squeeze()
+ except Exception as e:
+ rospy.logerr(f"Error getting the embedding: {e}")
+
+ if recognition_embedding is not None:
+ # Calculate the L2 distance between the embedding and all the stored representations.
+ recognition.l2_distances = [
+ L2Distance(self._get_min_l2_distance(face.representations, recognition_embedding), face.label)
+ for face in self._trained_faces
+ ]
+
+ return recognition
+
+ def _get_recognized_face(self, img, bbox) -> RecognizedFace:
+ face, bbox = extract_face(img, bbox, self._mtcnn.image_size, self._mtcnn.margin)
+ x_offset = bbox[0]
+ y_offset = bbox[1]
+ w = bbox[2] - bbox[0]
+ h = bbox[3] - bbox[1]
+
+ return RecognizedFace(face, ROI(x_offset, y_offset, w, h))
+
+
+ def _get_embedding(self, bgr_image: torch.Tensor) -> torch.Tensor:
+ """
+ Gets the vector of a face in the image
- :param img: inpute image
- :return: the bounding boxes of coordinations of the faces it detects
+ :param bgr_image: The input image [batch_size, C, H, W]
+ :return: The vector embedding
"""
+ return self._resnet(bgr_image.to(self._device)).detach().cpu().numpy()
- # Keep the landmarks for future use
- boxes, _, landmarks = self.mtcnn.detect(img, landmarks=True)
- return boxes
+ def detect(self, img: np.ndarray) -> List[RecognizedFace]:
+ """Detect faces in an image
- def _get_dists(self, embeddings: List[torch.Tensor]) -> Tuple[List[float], List[str]]:
+ :param img: input image in bgr
+ :return: The detected faces
+ """
+ bboxes, probs, _ = self._mtcnn.detect(img, landmarks=True)
+
+ # Extract faces
+ recognized_faces = [self._get_recognized_face(img, bbox) for bbox in bboxes]
+ recognized_faces = [self._update_with_categorical_distribution(face) for face in recognized_faces]
+ return recognized_faces
+
+ @staticmethod
+ def _get_min_l2_distance(olds: List[np.ndarray], new: np.ndarray) -> float:
+ """
+ Calculate the minimal l2 distance of a vector list w.r.t. another vector
+
+ :param olds: List of torch.Tensor
+ :param new: torch.Tensor
+ :return: Minimal l2 distance
+ """
+ return min([np.dot(old - new, old - new) for old in olds])
+
+ def _get_distances(self, embeddings: List[torch.Tensor]) -> Tuple[List[float], List[str]]:
"""
Returns min l2 distance of the identified face (compared with the database of know faces)
- :param embeddings: the embedded representation(s) of a face(s)
+ :param embeddings: The embedded representation(s) of a face(s)
:return: the min distance(s) of the embedded vector compared with the database faces from the corresponding label(s)
"""
- dist_per_emb_final = []
- dist = []
- dist_per_emb = []
+ distance_per_emb_final = []
+ distance = []
+ distance_per_emb = []
min_of_emb_final = []
@@ -84,22 +257,24 @@ def _get_dists(self, embeddings: List[torch.Tensor]) -> Tuple[List[float], List[
min_value_list_per_emb = []
# Calculate the L2 distance between the embedding and all the stored representations.
- for idx, emb in enumerate(embeddings):
- for face in self._trained_faces:
- for rep in face.representations:
- dist_per_emb.append(abs(rep - emb).norm().item())
- dist.append(dist_per_emb)
- rospy.loginfo(f"{dist_per_emb} dist_per_emb for embedded with index {idx}")
- dist_per_emb = []
- dist_per_emb_final.append(dist)
- dist = []
-
- rospy.loginfo(f"{dist_per_emb_final} dist_per_emb_final")
+ for idx, emb in enumerate(embeddings): # New measurements
+ embeddings.l2_distances = [
+ L2Distance(self._get_min_l2_distance(face.representations, emb), face.label)
+ for face in self._trained_faces
+ ]
+
+ distance.append(distance_per_emb)
+ rospy.loginfo(f"{distance_per_emb} dist_per_emb for embedded with index {idx}")
+ distance_per_emb = []
+ distance_per_emb_final.append(distance)
+ distance = []
+
+ rospy.loginfo(f"{distance_per_emb_final=}")
# Calculate the minimum distance for each labeled embedding
- # e.g min distance of all observation of label "Jake"
- for dist in dist_per_emb_final:
- min_of_emb = [min(j) for j in dist]
+ # e.g. min distance of all observations of label "Jake"
+ for distance in distance_per_emb_final:
+ min_of_emb = [min(j) for j in distance]
rospy.loginfo(f"{min_of_emb} min_of_emb")
min_of_emb_final.append(min_of_emb)
rospy.loginfo(f"{min_of_emb_final} min_of_emb_final")
@@ -110,27 +285,25 @@ def _get_dists(self, embeddings: List[torch.Tensor]) -> Tuple[List[float], List[
min_index_list_per_emb.append(value.index(min(value)))
min_value_list_per_emb.append(min(value))
rospy.loginfo(f"{min_index_list_per_emb}, min_index_list_per_emb")
- rospy.loginfo(f"{min_value_list_per_emb}, min_index_list")
- labelling = [self._trained_faces[i].get_label()
- for i in min_index_list_per_emb]
+ labelling = [self._trained_faces[i].label for i in min_index_list_per_emb]
rospy.loginfo(f"{labelling}, {min_value_list_per_emb}")
return min_value_list_per_emb, labelling
- def threhold_check(self, dist: List[float], labelling: List[str], labels: List[str], threshold: float) -> None:
+ def threshold_check(self, dist: List[float], labelling: List[str], labels: List[str], threshold: float) -> None:
"""
Updates the database with a new addition if the minimum distance is greater than the threshold.
:param dist: the list of minumum l2 norms of the embedded vectors compared with the database faces
- :param labelling: the corresponding labels of the chosen minimum face representations.
+ :param labelling: the corresponding labels of the chosen minimum face representations.
:param labels: the general list of labels
:param threshold: the threshold value which denotes if a face is new or not
"""
for idx, dis in enumerate(dist):
rospy.loginfo(f"distances are {dist} and labels are {labelling}")
if dis > threshold:
- labelling[idx] = labels[idx] # you can always condider the last label or something similar
+ labelling[idx] = labels[idx] # you can always condider the last label or something similar
rospy.loginfo(
f"Distance is >1 so assign new label: {self._trained_faces[-1].get_label()}, \
Representations: {len(self._trained_faces[-1].get_representations())}"
@@ -138,41 +311,38 @@ def threhold_check(self, dist: List[float], labelling: List[str], labels: List[s
else:
rospy.loginfo(f"Distance is <1 so no new label is needed")
- def detection_recognition(self, img: np.ndarray, labels: List[str], train: bool) -> Tuple[List[float], List[str]]:
+ def recognize(self, img: np.ndarray) -> List[RecognizedFace]:
"""
- Returns min l2 distance of the identified face (compared with the database of know faces)
+ Returns min l2 distance of the identified face (compared with the database of known faces)
- :param img: the target image collected from camera
- :param labels: NA/ a list with all labels (this will be used during integration)
- :param train: flag to train during inference time
+ :param img:
:return: the min distance(s) of the embedded vector compared with the database faces
:return: the corresponding label(s)
"""
- resnet = InceptionResnetV1(
- pretrained="vggface2").eval().to(self.device)
- pred_aligned = self.mtcnn(img)
- pred_aligned = pred_aligned.cuda()
- embeddings = resnet(pred_aligned).detach().cpu()
- rospy.loginfo(
- f"{embeddings.size()}, {type(embeddings)}, embeddings size")
-
- if not self._trained_faces:
- for nam, emb in enumerate(embeddings):
- index = self._get_trained_face_index(labels[nam])
- if index == -1:
- self._trained_faces.append(TrainedFace(labels[nam]))
-
- self._trained_faces[index].representations.append(emb)
+ bboxes, probs, _ = self._mtcnn.detect(img, landmarks=True)
+
+ faces = [self._get_recognized_face(img, bbox) for bbox in bboxes]
+ faces_only = [face._image for face in faces]
+ faces_only = torch.stack(faces_only, dim=0)
+ embeddings = self._get_embedding(faces_only)
+ recognized_faces = [RecognizedFace(img, bbox) for bbox in bboxes]
+ # rospy.loginfo(
+ # f"{embeddings.size()}, {type(embeddings)}, embeddings size")
+
+ # if not self._trained_faces:
+ # for idx, emb in enumerate(embeddings):
+ # label = labels[idx]
+ # index = self._get_trained_face_index(label)
+ # if index == -1:
+ # self._trained_faces.append(TrainedFace(label, [emb]))
+ # else:
+ # self._trained_faces[index].representations.append(emb)
# Calculate the L2 norm and check if the distance is bigger than 1 (face that we have not seen yet)
- dist, labelling = self._get_dists(embeddings)
- self.threhold_check(dist, labelling, labels, threshold=1)
+ dist = self._get_distances(embeddings)
+ self.threshold_check(dist, labelling, labels, threshold=1)
- if train:
- for idx_label, emb in enumerate(embeddings):
- rospy.loginfo(f"{idx_label}, idx_label")
- self.train(emb, labelling[idx_label])
rospy.loginfo(f"{len(self._trained_faces)}")
return dist, labelling
@@ -181,14 +351,19 @@ def _get_trained_face_index(self, label: str) -> int:
Returns the index of the trained face
:param label: label of the trained face
- :return: the index of the face in the self._trained faces list
+ :return: the index of the face in the self._trained faces list, -1 if not found
"""
- for i, f in enumerate(self._trained_faces):
+ for idx, f in enumerate(self._trained_faces):
if f.label == label:
- return i
+ return idx
return -1
- def train(self, face_representation: np.ndarray, name: str) -> None:
+ def train(self, img: np.ndarray, name: str) -> TrainedFace:
+ face = self._mtcnn(img)
+ embedding = self._get_embedding(torch.stack([face[0]], dim=0)).squeeze()
+ return self._train_impl(embedding, name)
+
+ def _train_impl(self, face_representation: np.ndarray, name: str) -> None:
"""
Adds a face to the trained faces, creates a vector representation and adds this
@@ -197,18 +372,17 @@ def train(self, face_representation: np.ndarray, name: str) -> None:
"""
index = self._get_trained_face_index(name)
if index == -1:
- rospy.loginfo(f"We do not know this face")
self._trained_faces.append(TrainedFace(name))
self._trained_faces[index].representations.append(face_representation)
rospy.loginfo(f"Trained faces:")
for trained_face in self._trained_faces:
rospy.loginfo(
- f"Label: {trained_face.get_label()}, Representations: {len(trained_face.get_representations())}"
+ f"Label: {trained_face.label}, Representations: {len(trained_face.representations)}"
)
- def save_trained_faces(self, file_name):
+ def save_trained_faces(self, file_name: PathLike):
pickle.dump(self._trained_faces, file_name)
- def restore_trained_faces(self, file_name):
+ def restore_trained_faces(self, file_name: PathLike):
self._trained_faces = pickle.load(file_name)