diff --git a/.gitignore b/.gitignore index 5f968990..10034c26 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,4 @@ docs/_build/ .idea /.spyproject .spyproject +hailort.log diff --git a/examples/hailo/coco.txt b/examples/hailo/coco.txt new file mode 100644 index 00000000..1f42c8eb --- /dev/null +++ b/examples/hailo/coco.txt @@ -0,0 +1,80 @@ +person +bicycle +car +motorcycle +airplane +bus +train +truck +boat +traffic light +fire hydrant +stop sign +parking meter +bench +bird +cat +dog +horse +sheep +cow +elephant +bear +zebra +giraffe +backpack +umbrella +handbag +tie +suitcase +frisbee +skis +snowboard +sports ball +kite +baseball bat +baseball glove +skateboard +surfboard +tennis racket +bottle +wine glass +cup +fork +knife +spoon +bowl +banana +apple +sandwich +orange +broccoli +carrot +hot dog +pizza +donut +cake +chair +couch +potted plant +bed +dining table +toilet +tv +laptop +mouse +remote +keyboard +cell phone +microwave +oven +toaster +sink +refrigerator +book +clock +vase +scissors +teddy bear +hair drier +toothbrush \ No newline at end of file diff --git a/examples/hailo/detect.py b/examples/hailo/detect.py new file mode 100755 index 00000000..6327a7f8 --- /dev/null +++ b/examples/hailo/detect.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 + +"""Example module for Hailo Detection.""" + +import argparse + +import cv2 + +from picamera2 import MappedArray, Picamera2, Preview +from picamera2.devices import Hailo + + +def extract_detections(hailo_output, w, h, class_names, threshold=0.5): + """Extract detections from the HailoRT-postprocess output.""" + results = [] + for class_id, detections in enumerate(hailo_output): + for detection in detections: + score = detection[4] + if score >= threshold: + y0, x0, y1, x1 = detection[:4] + bbox = (int(x0 * w), int(y0 * h), int(x1 * w), int(y1 * h)) + results.append([class_names[class_id], bbox, score]) + return results + + +def draw_objects(request): + current_detections = detections + if current_detections: + with MappedArray(request, "main") as m: + for class_name, bbox, score in current_detections: + x0, y0, x1, y1 = bbox + label = f"{class_name} %{int(score * 100)}" + cv2.rectangle(m.array, (x0, y0), (x1, y1), (0, 255, 0, 0), 2) + cv2.putText(m.array, label, (x0 + 5, y0 + 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0, 0), 1, cv2.LINE_AA) + + +if __name__ == "__main__": + # Parse command-line arguments. + parser = argparse.ArgumentParser(description="Detection Example") + parser.add_argument("-m", "--model", help="Path for the HEF model.", + default="/usr/share/hailo-models/yolov8s_h8l.hef") + parser.add_argument("-l", "--labels", default="coco.txt", + help="Path to a text file containing labels.") + parser.add_argument("-s", "--score_thresh", type=float, default=0.5, + help="Score threshold, must be a float between 0 and 1.") + args = parser.parse_args() + + # Get the Hailo model, the input size it wants, and the size of our preview stream. + with Hailo(args.model) as hailo: + model_h, model_w, _ = hailo.get_input_shape() + video_w, video_h = 1280, 960 + + # Load class names from the labels file + with open(args.labels, 'r', encoding="utf-8") as f: + class_names = f.read().splitlines() + + # The list of detected objects to draw. + detections = None + + # Configure and start Picamera2. + with Picamera2() as picam2: + main = {'size': (video_w, video_h), 'format': 'XRGB8888'} + lores = {'size': (model_w, model_h), 'format': 'RGB888'} + controls = {'FrameRate': 30, 'LensPosition': 4} + config = picam2.create_preview_configuration(main, lores=lores, controls=controls) + picam2.configure(config) + + picam2.start_preview(Preview.QTGL, x=0, y=0, width=video_w, height=video_h) + picam2.start() + picam2.pre_callback = draw_objects + + # Process each low resolution camera frame. + while True: + frame = picam2.capture_array('lores') + + # Run inference on the preprocessed frame + results = hailo.run(frame) + + # Extract detections from the inference results + detections = extract_detections(results[0], video_w, video_h, class_names, args.score_thresh) diff --git a/examples/hailo/pose.py b/examples/hailo/pose.py new file mode 100755 index 00000000..85276847 --- /dev/null +++ b/examples/hailo/pose.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 + +import argparse + +import cv2 +from pose_utils import postproc_yolov8_pose + +from picamera2 import MappedArray, Picamera2, Preview +from picamera2.devices import Hailo + +parser = argparse.ArgumentParser(description='Pose estimation using Hailo') +parser.add_argument('-m', '--model', help="HEF file path", default="/usr/share/hailo-models/yolov8s_pose_h8l_pi.hef") +args = parser.parse_args() + +NOSE, L_EYE, R_EYE, L_EAR, R_EAR, L_SHOULDER, R_SHOULDER, L_ELBOW, R_ELBOW, \ + L_WRIST, R_WRIST, L_HIP, R_HIP, L_KNEE, R_KNEE, L_ANKLE, R_ANKLE = range(17) + +JOINT_PAIRS = [[NOSE, L_EYE], [L_EYE, L_EAR], [NOSE, R_EYE], [R_EYE, R_EAR], + [L_SHOULDER, R_SHOULDER], + [L_SHOULDER, L_ELBOW], [L_ELBOW, L_WRIST], [R_SHOULDER, R_ELBOW], [R_ELBOW, R_WRIST], + [L_SHOULDER, L_HIP], [R_SHOULDER, R_HIP], [L_HIP, R_HIP], + [L_HIP, L_KNEE], [R_HIP, R_KNEE], [L_KNEE, L_ANKLE], [R_KNEE, R_ANKLE]] + + +def visualize_pose_estimation_result(results, image, model_size, detection_threshold=0.5, joint_threshold=0.5): + image_size = (image.shape[1], image.shape[0]) + + def scale_coord(coord): + return tuple([int(c * t / f) for c, f, t in zip(coord, model_size, image_size)]) + + bboxes, scores, keypoints, joint_scores = ( + results['bboxes'], results['scores'], results['keypoints'], results['joint_scores']) + box, score, keypoint, keypoint_score = bboxes[0], scores[0], keypoints[0], joint_scores[0] + + for detection_box, detection_score, detection_keypoints, detection_keypoints_score in ( + zip(box, score, keypoint, keypoint_score)): + if detection_score < detection_threshold: + continue + + coord_min = scale_coord(detection_box[:2]) + coord_max = scale_coord(detection_box[2:]) + cv2.rectangle(image, coord_min, coord_max, (255, 0, 0), 1) + cv2.putText(image, str(detection_score), coord_min, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (36, 255, 12), 1) + + joint_visible = detection_keypoints_score > joint_threshold + + detection_keypoints = detection_keypoints.reshape(17, 2) + for joint, joint_score in zip(detection_keypoints, detection_keypoints_score): + if joint_score > joint_threshold: + cv2.circle(image, scale_coord(joint), 4, (255, 0, 255), -1) + + for joint0, joint1 in JOINT_PAIRS: + if joint_visible[joint0] and joint_visible[joint1]: + cv2.line(image, scale_coord(detection_keypoints[joint0]), + scale_coord(detection_keypoints[joint1]), (255, 0, 255), 3) + + +def draw_predictions(request): + with MappedArray(request, 'main') as m: + predictions = last_predictions + if predictions: + visualize_pose_estimation_result(predictions, m.array, model_size) + + +# ---------------- Start of the example --------------------- # + +last_predictions = None + +with Hailo(args.model) as hailo: + main_size = (1024, 768) + model_h, model_w, _ = hailo.get_input_shape() + model_size = lores_size = (model_w, model_h) + + with Picamera2() as picam2: + main = {'size': main_size, 'format': 'XRGB8888'} + lores = {'size': lores_size, 'format': 'RGB888'} + config = picam2.create_video_configuration(main, lores=lores) + picam2.configure(config) + + picam2.start_preview(Preview.QTGL, x=0, y=0, width=main_size[0], height=main_size[1]) + picam2.start() + picam2.pre_callback = draw_predictions + + while True: + frame = picam2.capture_array('lores') + + # Do pose estimation. + raw_detections = hailo.run(frame) + + # Tidy up the predictions. num_of_classes is always 1 (?). + last_predictions = postproc_yolov8_pose(1, raw_detections, model_size) diff --git a/examples/hailo/pose_utils.py b/examples/hailo/pose_utils.py new file mode 100644 index 00000000..39b52f98 --- /dev/null +++ b/examples/hailo/pose_utils.py @@ -0,0 +1,290 @@ +import numpy as np + +kwargs = { + 'classes': 1, + 'nms_max_output_per_class': 300, + 'anchors': {'regression_length': 15, 'strides': [8, 16, 32]}, + 'score_threshold': 0.001, + 'nms_iou_thresh': 0.7, + 'meta_arch': 'nanodet_v8', + 'device_pre_post_layers': None +} + + +def postproc_yolov8_pose(num_of_classes, raw_detections, img_size): + # The input is a dictionary of outputs for each layer. For each layer we may have: + # A single numpy array, if batching was not used. + # A list of numpy arrays, when a batch size was specified. + # We convert the "list" into an extra numpy dimensions, which is what the code here expects. + for layer, output in raw_detections.items(): + if not isinstance(output, list): + raw_detections[layer] = np.expand_dims(output, axis=0) + elif len(output) == 1: + raw_detections[layer] = np.expand_dims(output[0], axis=0) + else: + raise RuntimeError("Pose post-processing only supports a batch size of 1") + + kwargs['img_dims'] = img_size + raw_detections_keys = list(raw_detections.keys()) + layer_from_shape: dict = {raw_detections[key].shape: key for key in raw_detections_keys} + + detection_output_channels = (kwargs['anchors']['regression_length'] + 1) * 4 # (regression length + 1) * num_coordinates + keypoints = 51 + + # The following assumes that the batch size is 1: + endnodes = [raw_detections[layer_from_shape[1, 20, 20, detection_output_channels]], + raw_detections[layer_from_shape[1, 20, 20, num_of_classes]], + raw_detections[layer_from_shape[1, 20, 20, keypoints]], + raw_detections[layer_from_shape[1, 40, 40, detection_output_channels]], + raw_detections[layer_from_shape[1, 40, 40, num_of_classes]], + raw_detections[layer_from_shape[1, 40, 40, keypoints]], + raw_detections[layer_from_shape[1, 80, 80, detection_output_channels]], + raw_detections[layer_from_shape[1, 80, 80, num_of_classes]], + raw_detections[layer_from_shape[1, 80, 80, keypoints]]] + + predictions_dict = yolov8_pose_estimation_postprocess(endnodes, **kwargs) + + return predictions_dict + + +# ---------------- Architecture functions ----------------- # + +def _sigmoid(x): + return 1 / (1 + np.exp(-x)) + + +def _softmax(x): + return np.exp(x) / np.expand_dims(np.sum(np.exp(x), axis=-1), axis=-1) + + +def max(a, b): + return a if a >= b else b + + +def min(a, b): + return a if a <= b else b + + +def nms(dets, thresh): + x1 = dets[:, 0] + y1 = dets[:, 1] + x2 = dets[:, 2] + y2 = dets[:, 3] + scores = dets[:, 4] + + areas = (x2 - x1 + 1) * (y2 - y1 + 1) + order = np.argsort(scores)[::-1] + + ndets = dets.shape[0] + suppressed = np.zeros((ndets), dtype=int) + + for _i in range(ndets): + i = order[_i] + if suppressed[i] == 1: + continue + ix1 = x1[i] + iy1 = y1[i] + ix2 = x2[i] + iy2 = y2[i] + iarea = areas[i] + for _j in range(_i + 1, ndets): + j = order[_j] + if suppressed[j] == 1: + continue + xx1 = max(ix1, x1[j]) + yy1 = max(iy1, y1[j]) + xx2 = min(ix2, x2[j]) + yy2 = min(iy2, y2[j]) + w = max(0.0, xx2 - xx1 + 1) + h = max(0.0, yy2 - yy1 + 1) + inter = w * h + ovr = inter / (iarea + areas[j] - inter) + if ovr >= thresh: + suppressed[j] = 1 + + return np.where(suppressed == 0)[0] + + +def _yolov8_decoding(raw_boxes, raw_kpts, strides, image_dims, reg_max): + boxes = None + decoded_kpts = None + + for box_distribute, kpts, stride, _ in zip(raw_boxes, raw_kpts, strides, np.arange(3)): + # create grid + shape = [int(x / stride) for x in image_dims] + grid_x = np.arange(shape[1]) + 0.5 + grid_y = np.arange(shape[0]) + 0.5 + grid_x, grid_y = np.meshgrid(grid_x, grid_y) + ct_row = grid_y.flatten() * stride + ct_col = grid_x.flatten() * stride + center = np.stack((ct_col, ct_row, ct_col, ct_row), axis=1) + + # box distribution to distance + reg_range = np.arange(reg_max + 1) + box_distribute = np.reshape( + box_distribute, (-1, box_distribute.shape[1] * box_distribute.shape[2], 4, reg_max + 1)) + box_distance = _softmax(box_distribute) + box_distance = box_distance * np.reshape(reg_range, (1, 1, 1, -1)) + box_distance = np.sum(box_distance, axis=-1) + box_distance = box_distance * stride + + # decode box + box_distance = np.concatenate([box_distance[:, :, :2] * (-1), box_distance[:, :, 2:]], axis=-1) + decode_box = np.expand_dims(center, axis=0) + box_distance + + xmin = decode_box[:, :, 0] + ymin = decode_box[:, :, 1] + xmax = decode_box[:, :, 2] + ymax = decode_box[:, :, 3] + decode_box = np.transpose([xmin, ymin, xmax, ymax], [1, 2, 0]) + + xywh_box = np.transpose([(xmin + xmax) / 2, (ymin + ymax) / 2, xmax - xmin, ymax - ymin], [1, 2, 0]) + boxes = xywh_box if boxes is None else np.concatenate([boxes, xywh_box], axis=1) + + # kpts decoding + kpts[..., :2] *= 2 + kpts[..., :2] = stride * (kpts[..., :2] - 0.5) + np.expand_dims(center[..., :2], axis=1) + + decoded_kpts = kpts if decoded_kpts is None else np.concatenate([decoded_kpts, kpts], axis=1) + + return boxes, decoded_kpts + + +def xywh2xyxy(x): + y = np.copy(x) + y[:, 0] = x[:, 0] - x[:, 2] / 2 + y[:, 1] = x[:, 1] - x[:, 3] / 2 + y[:, 2] = x[:, 0] + x[:, 2] / 2 + y[:, 3] = x[:, 1] + x[:, 3] / 2 + return y + + +def non_max_suppression(prediction, conf_thres=0.1, iou_thres=0.45, + max_det=100, n_kpts=17): + """Non-Maximum Suppression (NMS) on inference results to reject overlapping detections. + + Args: + prediction: numpy.ndarray with shape (batch_size, num_proposals, 56) + conf_thres: confidence threshold for NMS + iou_thres: IoU threshold for NMS + max_det: Maximal number of detections to keep after NMS + nm: Number of masks + multi_label: Consider only best class per proposal or all conf_thresh passing proposals + + Returns: + A list of per image detections, where each is a dictionary with the following structure: + { + 'detection_boxes': numpy.ndarray with shape (num_detections, 4), + 'keypoints': numpy.ndarray with shape (num_detections, 17, 3), + 'detection_scores': numpy.ndarray with shape (num_detections, 1), + 'num_detections': int + } + """ + assert 0 <= conf_thres <= 1, \ + f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0' + assert 0 <= iou_thres <= 1, \ + f'Invalid IoU threshold {iou_thres}, valid values are between 0.0 and 1.0' + + nc = prediction.shape[2] - n_kpts * 3 - 4 # number of classes + xc = prediction[..., 4] > conf_thres # candidates + + # max_wh = 7680 # (pixels) maximum box width and height + ki = 4 + nc # keypoints start index + output = [] + for xi, x in enumerate(prediction): # image index, image inference + x = x[xc[xi]] + # If none remain process next image + if not x.shape[0]: + output.append({'bboxes': np.zeros((0, 4)), + 'keypoints': np.zeros((0, n_kpts, 3)), + 'scores': np.zeros((0)), + 'num_detections': 0}) + continue + + # (center_x, center_y, width, height) to (x1, y1, x2, y2) + boxes = xywh2xyxy(x[:, :4]) + kpts = x[:, ki:] + + conf = np.expand_dims(x[:, 4:ki].max(1), 1) + j = np.expand_dims(x[:, 4:ki].argmax(1), 1).astype(np.float32) + + keep = np.squeeze(conf, 1) > conf_thres + x = np.concatenate((boxes, conf, j, kpts), 1)[keep] + + # sort by confidence + x = x[x[:, 4].argsort()[::-1]] + + boxes = x[:, :4] + conf = x[:, 4:5] + preds = np.hstack([boxes.astype(np.float32), conf.astype(np.float32)]) + + keep = nms(preds, iou_thres) + if keep.shape[0] > max_det: + keep = keep[:max_det] + + out = x[keep] + scores = out[:, 4] + boxes = out[:, :4] + kpts = out[:, 6:] + kpts = np.reshape(kpts, (-1, n_kpts, 3)) + + out = {'bboxes': boxes, + 'keypoints': kpts, + 'scores': scores, + 'num_detections': int(scores.shape[0])} + + output.append(out) + return output + + +def yolov8_pose_estimation_postprocess(endnodes, **kwargs): + """Decode and run NMS on Yolov8 pose estimation output. + + endnodes is a list of 10 tensors: + endnodes[0]: bbox output with shapes (BS, 20, 20, 64) + endnodes[1]: scores output with shapes (BS, 20, 20, 80) + endnodes[2]: keypoints output with shapes (BS, 20, 20, 51) + endnodes[3]: bbox output with shapes (BS, 40, 40, 64) + endnodes[4]: scores output with shapes (BS, 40, 40, 80) + endnodes[5]: keypoints output with shapes (BS, 40, 40, 51) + endnodes[6]: bbox output with shapes (BS, 80, 80, 64) + endnodes[7]: scores output with shapes (BS, 80, 80, 80) + endnodes[8]: keypoints output with shapes (BS, 80, 80, 51) + + Returns: + A list of per image detections, where each is a dictionary with the following structure: + { + 'detection_boxes': numpy.ndarray with shape (num_detections, 4), + 'keypoints': numpy.ndarray with shape (num_detections, 3), + 'detection_classes': numpy.ndarray with shape (num_detections, 80), + 'detection_scores': numpy.ndarray with shape (num_detections, 80) + } + """ + batch_size = endnodes[0].shape[0] + num_classes = kwargs['classes'] # always 1 + max_detections = kwargs['nms_max_output_per_class'] + strides = kwargs['anchors']['strides'][::-1] + image_dims = tuple(kwargs['img_dims']) + reg_max = kwargs['anchors']['regression_length'] + raw_boxes = endnodes[:7:3] + scores = [np.reshape(s, (-1, s.shape[1] * s.shape[2], num_classes)) for s in endnodes[1:8:3]] + scores = np.concatenate(scores, axis=1) + kpts = [np.reshape(c, (-1, c.shape[1] * c.shape[2], 17, 3)) for c in endnodes[2:9:3]] + decoded_boxes, decoded_kpts = _yolov8_decoding(raw_boxes, kpts, strides, image_dims, reg_max) + score_thres = kwargs['score_threshold'] + iou_thres = kwargs['nms_iou_thresh'] + + decoded_kpts = np.reshape(decoded_kpts, (batch_size, -1, 51)) + predictions = np.concatenate([decoded_boxes, scores, decoded_kpts], axis=2) + nms_res = non_max_suppression(predictions, conf_thres=score_thres, iou_thres=iou_thres, max_det=max_detections) + output = {} + output['bboxes'] = np.zeros((batch_size, max_detections, 4)) + output['keypoints'] = np.zeros((batch_size, max_detections, 17, 2)) + output['joint_scores'] = np.zeros((batch_size, max_detections, 17, 1)) + output['scores'] = np.zeros((batch_size, max_detections, 1)) + for b in range(batch_size): + output['bboxes'][b, :nms_res[b]['num_detections']] = nms_res[b]['bboxes'] + output['keypoints'][b, :nms_res[b]['num_detections']] = nms_res[b]['keypoints'][..., :2] + output['joint_scores'][b, :nms_res[b]['num_detections'], ..., 0] = _sigmoid(nms_res[b]['keypoints'][..., 2]) + output['scores'][b, :nms_res[b]['num_detections'], ..., 0] = nms_res[b]['scores'] + return output diff --git a/picamera2/devices/__init__.py b/picamera2/devices/__init__.py index 6157cbb4..a8ccb03b 100644 --- a/picamera2/devices/__init__.py +++ b/picamera2/devices/__init__.py @@ -1 +1,6 @@ +try: + # Hailo requires hailo_platform package, which may not be installed on non-Hailo platforms. + from .hailo import Hailo +except ModuleNotFoundError: + pass from .imx708 import IMX708 diff --git a/picamera2/devices/hailo/__init__.py b/picamera2/devices/hailo/__init__.py new file mode 100644 index 00000000..abebb8c3 --- /dev/null +++ b/picamera2/devices/hailo/__init__.py @@ -0,0 +1 @@ +from .hailo import Hailo diff --git a/picamera2/devices/hailo/hailo.py b/picamera2/devices/hailo/hailo.py new file mode 100644 index 00000000..bbbd0aaa --- /dev/null +++ b/picamera2/devices/hailo/hailo.py @@ -0,0 +1,178 @@ +from concurrent.futures import Future +from functools import partial + +import numpy as np +from hailo_platform import HEF, FormatType, HailoSchedulingAlgorithm, VDevice + + +class Hailo: + def __init__(self, hef_path, batch_size=None, output_type='FLOAT32'): + """ + Initialize the HailoAsyncInference class with the provided HEF model file path. + + Args: + hef_path (str): Path to the HEF model file. + batch_size (int): Batch size for inference. + output_type (str): Format type of the output stream. + """ + params = VDevice.create_params() + params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN + + self.batch_size = batch_size + self.hef = HEF(hef_path) + self.target = VDevice(params) + self.infer_model = self.target.create_infer_model(hef_path) + self.infer_model.set_batch_size(1 if batch_size is None else batch_size) + self._set_input_output(output_type) + self.input_vstream_info, self.output_vstream_info = self._get_vstream_info() + self.configured_infer_model = self.infer_model.configure() + + def __enter__(self): + """Used for allowing use with context manager.""" + return self + + def __exit__(self, exc_type, exc_val, exc_traceback): + """Used for allowing use with context manager.""" + self.close() + + def _set_input_output(self, output_type): + """ + Set the input and output layer information for the HEF model. + + Args: + output_type (str): Format type of the output stream. + """ + input_format_type = self.hef.get_input_vstream_infos()[0].format.type + self.infer_model.input().set_format_type(input_format_type) + output_format_type = getattr(FormatType, output_type) + for output in self.infer_model.outputs: + output.set_format_type(output_format_type) + self.num_outputs = len(self.infer_model.outputs) + + def callback(self, completion_info, bindings, future, last): + """ + Callback function for handling inference results. + + Args: + completion_info: Information about the completion of the inference task. + bindings: Bindings object containing input and output buffers. + """ + if future._has_had_error: + # Don't really know if this can happen. + return + elif completion_info.exception: + future._has_had_error = True + future.set_exception(completion_info.exception) + else: + if self.num_outputs <= 1: + # Only one output. Return the output directly. + if self.batch_size is None: + # No batching. Return this single output on its own. + future._intermediate_result = bindings.output().get_buffer() + else: + # Return a list containing an output for each item in the batch. + future._intermediate_result.append(bindings.output().get_buffer()) + else: + # Multiple outputs. Return a dictionary of outputs keyed on the layer name. + if self.batch_size is None: + # No batching. Use a single output as the value for each key. + for name in bindings._output_names: + future._intermediate_result[name] = bindings.output(name).get_buffer() + else: + # Each key contains a list of outputs, one per item in the batch. + for name in bindings._output_names: + future._intermediate_result[name].append(bindings.output(name).get_buffer()) + if last: + future.set_result(future._intermediate_result) + + def _get_vstream_info(self): + """ + Get information about input and output stream layers. + + Returns: + tuple: List of input stream layer information, List of output stream layer information. + """ + input_vstream_info = self.hef.get_input_vstream_infos() + output_vstream_info = self.hef.get_output_vstream_infos() + + return input_vstream_info, output_vstream_info + + def get_input_shape(self): + """ + Get the shape of the model's input layer. + + Returns: + tuple: Shape of the model's input layer. + """ + return self.input_vstream_info[0].shape # Assumes that the model has one input + + def describe(self): + """ + Return information that describes what's in the model. + + Returns: + A pair of lists containing, respectively, information about the input and output layers. + """ + inputs = [(layer.name, layer.shape, layer.format.type) for layer in self.hef.get_input_vstream_infos()] + outputs = [(layer.name, layer.shape, layer.format.type) for layer in self.hef.get_output_vstream_infos()] + + return inputs, outputs + + def run_async(self, input_data): + """ + Run asynchronous inference on the Hailo-8 device. + + Args: + input_data (np.ndarray): Input data for inference. + + Returns: + future: Future to wait on for the inference results. + """ + if self.batch_size is None: + input_data = np.expand_dims(input_data, axis=0) + + future = Future() + future._has_had_error = False + if self.num_outputs <= 1: + future._intermediate_result = [] + else: + future._intermediate_result = {output.name: [] for output in self.infer_model.outputs} + + for i, frame in enumerate(input_data): + last = i == len(input_data) - 1 + bindings = self._create_bindings() + bindings.input().set_buffer(frame) + self.configured_infer_model.wait_for_async_ready(timeout_ms=10000) + self.configured_infer_model.run_async([bindings], + partial(self.callback, bindings=bindings, future=future, last=last)) + + return future + + def run(self, input_data): + """ + Run asynchronous inference on the Hailo-8 device. + + Args: + input_data (np.ndarray): Input data for inference. + + Returns: + inference output or list: Inference output or List of inference outputs if batch_size is not None. + """ + future = self.run_async(input_data) + return future.result() + + def _create_bindings(self): + """ + Create bindings for input and output buffers. + + Returns: + bindings: Bindings object with input and output buffers. + """ + output_buffers = {name: np.empty(self.infer_model.output(name).shape, dtype=np.float32) + for name in self.infer_model.output_names} + return self.configured_infer_model.create_bindings(output_buffers=output_buffers) + + def close(self): + """Release the Hailo device.""" + del self.configured_infer_model + self.target.release()