From 0914e5230d0b53342d51d3a1521b16c5baa6a6a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sp=C3=B6rk?= Date: Sun, 21 Jan 2024 20:01:25 +0100 Subject: [PATCH] Refactor calibration --- foosball/__init__.py | 6 +- foosball/detectors/__init__.py | 21 ++- foosball/detectors/color.py | 72 ++++++--- foosball/models.py | 13 +- foosball/sink/opencv.py | 255 ++++++++++++++++---------------- foosball/tracking/__init__.py | 49 +----- foosball/tracking/ai.py | 72 ++++----- foosball/tracking/analyze.py | 4 +- foosball/tracking/preprocess.py | 18 +-- foosball/tracking/render.py | 7 +- foosball/tracking/tracker.py | 40 ++--- foosball/utils.py | 12 +- 12 files changed, 289 insertions(+), 280 deletions(-) diff --git a/foosball/__init__.py b/foosball/__init__.py index 5955849..28599d2 100644 --- a/foosball/__init__.py +++ b/foosball/__init__.py @@ -6,7 +6,7 @@ from const import CALIBRATION_MODE, CALIBRATION_IMG_PATH, CALIBRATION_VIDEO, CALIBRATION_SAMPLE_SIZE, ARUCO_BOARD, \ FILE, CAMERA_ID, FRAMERATE, OUTPUT, CAPTURE, DISPLAY, BALL, XPAD, YPAD, SCALE, VERBOSE, HEADLESS, OFF, \ - MAX_PIPE_SIZE, INFO_VERBOSITY, GPU, AUDIO, WEBHOOK, BUFFER + MAX_PIPE_SIZE, INFO_VERBOSITY, GPU, AUDIO, WEBHOOK, BUFFER, BallPresets, CalibrationMode from foosball.arUcos.calibration import print_aruco_board, calibrate_camera from foosball.tracking.ai import AI @@ -39,7 +39,7 @@ def get_argparse(): ap.add_argument("-conf", "--config", type=file_path, default=None) calibration = ap.add_argument_group(title="Calibration", description="Options for camera or setup calibration") - calibration.add_argument("-c", f"--{CALIBRATION_MODE}", choices=['ball', 'goal', 'cam'], help="Calibration mode") + calibration.add_argument("-c", f"--{CALIBRATION_MODE}", choices=[CalibrationMode.BALL, CalibrationMode.GOAL, CalibrationMode.CAM], help="Calibration mode") calibration.add_argument("-ci", f"--{CALIBRATION_IMG_PATH}", type=dir_path, default=None, help="Images path for calibration mode. If not given switching to live calibration") calibration.add_argument("-cv", f"--{CALIBRATION_VIDEO}", type=file_path, default=None, @@ -57,7 +57,7 @@ def get_argparse(): io.add_argument("-d", f"--{DISPLAY}", choices=['cv', 'gear'], default='cv', help="display backend cv=direct display, gear=stream") tracker = ap.add_argument_group(title="Tracker", description="Options for the ball/goal tracker") - tracker.add_argument("-ba", f"--{BALL}", choices=['yaml', 'orange', 'yellow'], default='yaml', + tracker.add_argument("-ba", f"--{BALL}", choices=[BallPresets.YAML, BallPresets.ORANGE, BallPresets.YELLOW], default=BallPresets.YAML, help="Pre-configured ball color bounds. If 'yaml' is selected, a file called 'ball.yaml' " "(stored by hitting 's' in ball calibration mode) will be loaded as a preset." "If no file present fallback to 'yellow'") diff --git a/foosball/detectors/__init__.py b/foosball/detectors/__init__.py index 37e8e26..313cef3 100644 --- a/foosball/detectors/__init__.py +++ b/foosball/detectors/__init__.py @@ -1,15 +1,28 @@ from abc import ABC, abstractmethod from typing import TypeVar, Generic -from foosball.models import Frame +from foosball.models import Frame, DetectedBall, DetectedGoals DetectorResult = TypeVar('DetectorResult') +DetectorConfig = TypeVar('DetectorConfig') +BallConfig = TypeVar('BallConfig') +GoalConfig = TypeVar('GoalConfig') -class Detector(ABC, Generic[DetectorResult]): - def __init__(self, *args, **kwargs): - pass +class Detector(ABC, Generic[DetectorConfig, DetectorResult]): + def __init__(self, config: DetectorConfig, *args, **kwargs): + self.config = config @abstractmethod def detect(self, frame: Frame) -> DetectorResult | None: pass + + +class BallDetector(Generic[BallConfig], Detector[BallConfig, DetectedBall], ABC): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + +class GoalDetector(Generic[GoalConfig], Detector[GoalConfig, DetectedGoals], ABC): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) \ No newline at end of file diff --git a/foosball/detectors/color.py b/foosball/detectors/color.py index 86b8159..f731fa1 100644 --- a/foosball/detectors/color.py +++ b/foosball/detectors/color.py @@ -1,24 +1,21 @@ import logging import os -from abc import ABC from dataclasses import dataclass -from typing import TypeVar, Generic import cv2 import imutils import numpy as np import yaml -from . import Detector, DetectorResult -from ..models import Frame, DetectedGoals, Point, Goal, Blob, Goals, DetectedBall, HSV +from const import BallPresets +from . import BallDetector, GoalDetector +from ..models import Frame, DetectedGoals, Point, Goal, Blob, Goals, DetectedBall, HSV, rgb2hsv logger = logging.getLogger(__name__) -DetectorConfig = TypeVar('DetectorConfig') - @dataclass -class BallConfig: +class BallColorConfig: bounds: [HSV, HSV] invert_frame: bool = False invert_mask: bool = False @@ -29,21 +26,48 @@ def store(self): with open(filename, "w") as f: yaml.dump(self.to_dict(), f) + @staticmethod + def yellow(): + lower = rgb2hsv(np.array([140, 86, 73])) + upper = rgb2hsv(np.array([0, 255, 94])) + + return BallColorConfig(bounds=[lower, upper], invert_frame=False, invert_mask=False) + + @staticmethod + def orange(): + lower = rgb2hsv(np.array([166, 94, 72])) + upper = rgb2hsv(np.array([0, 249, 199])) + + return BallColorConfig(bounds=[lower, upper], invert_frame=False, invert_mask=False) + + @staticmethod + def preset(ball: str): + match ball: + case BallPresets.YAML: + return BallColorConfig.load() or BallColorConfig.yellow() + case BallPresets.ORANGE: + return BallColorConfig.orange() + case BallPresets.YELLOW: + return BallColorConfig.yellow() + case _: + logging.error("Unknown ball color. Falling back to 'yellow'") + return BallColorConfig.yellow() + @staticmethod def load(filename='ball.yaml'): if os.path.isfile(filename): logging.info("Loading ball config ball.yaml") with open(filename, 'r') as f: c = yaml.safe_load(f) - return BallConfig(invert_frame=c['invert_frame'], invert_mask=c['invert_mask'], - bounds=np.array(c['bounds'])) + return BallColorConfig(invert_frame=c['invert_frame'], invert_mask=c['invert_mask'], + bounds=np.array(c['bounds'])) else: logging.info("No ball config found") return None def __eq__(self, other): """Overrides the default implementation""" - if isinstance(other, BallConfig): + if isinstance(other, BallColorConfig): return (all([a == b for a, b in zip(self.bounds[0], other.bounds[0])]) and all([a == b for a, b in zip(self.bounds[1], other.bounds[1])]) and self.invert_mask == other.invert_mask and @@ -58,13 +82,19 @@ def to_dict(self): } - @dataclass -class GoalConfig: +class GoalColorConfig: bounds: [int, int] invert_frame: bool = True invert_mask: bool = True + @staticmethod + def preset(): + default_config = GoalColorConfig(bounds=[0, 235], invert_frame=True, invert_mask=True) + if os.path.isfile('goal.yaml'): + return GoalColorConfig.load() or default_config + return default_config + def store(self): filename = "goal.yaml" print(f"Store config {filename}" + (" " * 50), end="\n\n") @@ -75,11 +105,11 @@ def store(self): def load(filename='goal.yaml'): with open(filename, 'r') as f: c = yaml.safe_load(f) - return GoalConfig(**c) + return GoalColorConfig(**c) def __eq__(self, other): """Overrides the default implementation""" - if isinstance(other, GoalConfig): + if isinstance(other, GoalColorConfig): return (all([a == b for a, b in zip(self.bounds, other.bounds)]) and self.invert_mask == other.invert_mask and self.invert_frame == other.invert_frame) @@ -93,13 +123,7 @@ def to_dict(self): } -class ColorDetector(Generic[DetectorConfig, DetectorResult], Detector[DetectorResult], ABC): - def __init__(self, config: DetectorConfig, *args, **kwargs): - super().__init__(*args, **kwargs) - self.config = config - - -class BallDetector(ColorDetector[BallConfig, DetectedBall]): +class BallColorDetector(BallDetector[BallColorConfig]): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -112,7 +136,7 @@ def detect(self, frame) -> DetectedBall: logger.error("Ball Detection not possible. Config is None") -class GoalDetector(ColorDetector[GoalConfig, DetectedGoals]): +class GoalColorDetector(GoalDetector[GoalColorConfig]): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -156,7 +180,7 @@ def detect(self, frame: Frame) -> DetectedGoals: logger.error("Goal Detection not possible. config is None") -def filter_color_range(frame, config: BallConfig) -> Frame: +def filter_color_range(frame, config: BallColorConfig) -> Frame: [lower, upper] = config.bounds f = frame if not config.invert_frame else cv2.bitwise_not(frame) @@ -182,7 +206,7 @@ def filter_color_range(frame, config: BallConfig) -> Frame: return cv2.bitwise_and(f, f, mask=simple_mask) -def filter_gray_range(frame, config: GoalConfig) -> Frame: +def filter_gray_range(frame, config: GoalColorConfig) -> Frame: try: [lower, upper] = config.bounds f = frame if not config.invert_frame else cv2.bitwise_not(frame) diff --git a/foosball/models.py b/foosball/models.py index 99d2fa1..622970e 100644 --- a/foosball/models.py +++ b/foosball/models.py @@ -100,9 +100,15 @@ class DetectedGoals: Track = collections.deque +class Verbosity(Enum): + TRACE = 0 + DEBUG = 1 + INFO = 2 + + @dataclass class Info: - verbosity: int + verbosity: Verbosity title: str value: str @@ -123,8 +129,9 @@ def append(self, info: Info): def concat(self, info_log): self.infos = self.infos + info_log.infos - def filter(self, info_verbosity=0): - return InfoLog(infos=[i for i in self.infos if info_verbosity <= i.verbosity]) + def filter(self, infoVerbosity: Verbosity = Verbosity.TRACE): + return InfoLog( + infos=[i for i in self.infos if infoVerbosity is not None and infoVerbosity.value <= i.verbosity.value]) def to_string(self): return " - ".join([i.to_string() for i in self.infos]) diff --git a/foosball/sink/opencv.py b/foosball/sink/opencv.py index 4ca0fda..45c9d6a 100644 --- a/foosball/sink/opencv.py +++ b/foosball/sink/opencv.py @@ -1,15 +1,16 @@ +from abc import ABC, abstractmethod +from copy import deepcopy from enum import Enum -from typing import Callable +from typing import Callable, Generic, TypeVar import cv2 import numpy as np import yaml +from const import CalibrationMode from . import Sink -from ..detectors.color import BallConfig, GoalConfig - -GOAL = "goal" -BALL = "ball" +from ..detectors.color import BallColorConfig, GoalColorConfig +from ..utils import int2bool, avg class Key(Enum): @@ -68,137 +69,133 @@ def wait(loop=False, interval=1, callbacks=None): return False -def slider_label(rgb, bound): - return f"{rgb} ({bound})" - - -def add_config_input(calibrationMode, config): - if calibrationMode == GOAL: - add_goals_config_input(config) - elif calibrationMode == BALL: - add_ball_config_input(config) - - -def add_ball_config_input(bounds: BallConfig): - [lower_hsv, upper_hsv] = bounds.bounds - cv2.createTrackbar('invert_frame', BALL, 1 if bounds.invert_frame else 0, 1, lambda v: None) - cv2.createTrackbar('invert_mask', BALL, 1 if bounds.invert_mask else 0, 1, lambda v: None) - # create trackbars for color change - cv2.createTrackbar('Hue', BALL, avg(lower_hsv[0], upper_hsv[0]), 179, lambda v: None) - cv2.createTrackbar(slider_label('S', 'low'), BALL, lower_hsv[1], 255, lambda v: None) - cv2.createTrackbar(slider_label('V', 'low'), BALL, lower_hsv[2], 255, lambda v: None) - cv2.createTrackbar(slider_label('S', 'high'), BALL, upper_hsv[1], 255, lambda v: None) - cv2.createTrackbar(slider_label('V', 'high'), BALL, upper_hsv[2], 255, lambda v: None) - # cv2.createButton("Reset", reset_bounds, (name, lower_rgb, upper_rgb)) - - -def add_goals_config_input(config: GoalConfig): - [lower, upper] = config.bounds - cv2.createTrackbar('invert_frame', GOAL, 1 if config.invert_frame else 0, 1, lambda v: None) - cv2.createTrackbar('invert_mask', GOAL, 1 if config.invert_mask else 0, 1, lambda v: None) - # create trackbars for color change - cv2.createTrackbar("lower", GOAL, lower, 255, lambda v: None) - cv2.createTrackbar("upper", GOAL, upper, 255, lambda v: None) - # cv2.createButton("Reset", reset_bounds, (name, lower_rgb, upper_rgb)) - - -def reset_config(calibrationMode, config): - if calibrationMode == GOAL: - reset_goal_config(config) - elif calibrationMode == BALL: - reset_ball_config(config) - - -def avg(x, y): - return int((x + y) / 2) - -def reset_ball_config(bounds: BallConfig): - [lower_hsv, upper_hsv] = bounds.bounds - print(f"Reset config {BALL}", end="\n\n\n") - - cv2.setTrackbarPos('invert_frame', BALL, 1 if bounds.invert_frame else 0) - cv2.setTrackbarPos('invert_mask', BALL, 1 if bounds.invert_mask else 0) - - cv2.setTrackbarPos('Hue', BALL, avg(lower_hsv[0], upper_hsv[0])) - cv2.setTrackbarPos(slider_label('S', 'low'), BALL, lower_hsv[1]) - cv2.setTrackbarPos(slider_label('V', 'low'), BALL, lower_hsv[2]) - cv2.setTrackbarPos(slider_label('S', 'high'), BALL, upper_hsv[1]) - cv2.setTrackbarPos(slider_label('V', 'high'), BALL, upper_hsv[2]) +def slider_label(name, bound): + return f"{name} ({bound})" -def reset_goal_config(config: GoalConfig): - [lower, upper] = config.bounds - print(f"Reset config {GOAL}", end="\n\n\n") +CalibrationConfig = TypeVar('CalibrationConfig') - cv2.setTrackbarPos('invert_frame', GOAL, 1 if config.invert_frame else 0) - cv2.setTrackbarPos('invert_mask', GOAL, 1 if config.invert_mask else 0) - cv2.setTrackbarPos('lower', GOAL, lower) - cv2.setTrackbarPos('upper', GOAL, upper) +class Calibration(ABC, Generic[CalibrationConfig]): + def __init__(self, config: CalibrationConfig, *args, **kwargs): + self.config: CalibrationConfig = config + self.init_config: CalibrationConfig = deepcopy(config) + def reset(self): + print(f"Reset calibration config", end="\n\n\n") + self.set_slider_config(self.init_config) -def store_ball_config(config: BallConfig): - filename = "ball.yaml" - [lower, upper] = config.bounds - print(f"Store config {filename}" + (" " * 50), end="\n\n") - with open(filename, "w") as f: - yaml.dump({ - "lower": lower.tolist(), - "upper": upper.tolist(), - "invert_frame": config.invert_frame, - "invert_mask": config.invert_mask - }, f) + @abstractmethod + def set_slider_config(self, config: CalibrationConfig): + pass + @abstractmethod + def store(self): + pass -def store_goals_config(config: GoalConfig): - filename = "goal.yaml" - [lower, upper] = config.bounds - print(f"Store config {filename}" + (" " * 50), end="\n\n") - with open(filename, "w") as f: - yaml.dump({ - "lower": lower, - "upper": upper, - "invert_frame": config.invert_frame, - "invert_mask": config.invert_mask - }, f) + @abstractmethod + def get_slider_config(self) -> CalibrationConfig: + pass -def get_slider_config(calibrationMode): - if calibrationMode == GOAL: - return get_slider_goals_config() - elif calibrationMode == BALL: - return get_slider_ball_config() - - -def int2bool(x: int) -> bool: - return True if x == 1 else False - - -def get_slider_ball_config(): - # get current positions of four trackbars - invert_frame = cv2.getTrackbarPos('invert_frame', BALL) - invert_mask = cv2.getTrackbarPos('invert_mask', BALL) - - hue = cv2.getTrackbarPos('Hue', BALL) - hl = max(0, hue - 10) - hh = min(179, hue + 10) - - sl = cv2.getTrackbarPos(slider_label('S', 'low'), BALL) - sh = cv2.getTrackbarPos(slider_label('S', 'high'), BALL) - - vl = cv2.getTrackbarPos(slider_label('V', 'low'), BALL) - vh = cv2.getTrackbarPos(slider_label('V', 'high'), BALL) - lower = np.array([hl, sl, vl]) - upper = np.array([hh, sh, vh]) - return BallConfig(bounds=[lower, upper], invert_mask=int2bool(invert_mask), invert_frame=int2bool(invert_frame)) - - -def get_slider_goals_config(): - # get current positions of four trackbars - invert_frame = cv2.getTrackbarPos('invert_frame', GOAL) - invert_mask = cv2.getTrackbarPos('invert_mask', GOAL) - - lower = cv2.getTrackbarPos('lower', GOAL) - upper = cv2.getTrackbarPos('upper', GOAL) - - return GoalConfig(bounds=[lower, upper], invert_mask=int2bool(invert_mask), invert_frame=int2bool(invert_frame)) +class GoalColorCalibration(Calibration[GoalColorConfig]): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + [lower, upper] = self.init_config.bounds + cv2.createTrackbar('invert_frame', CalibrationMode.GOAL, 1 if self.init_config.invert_frame else 0, 1, lambda v: None) + cv2.createTrackbar('invert_mask', CalibrationMode.GOAL, 1 if self.init_config.invert_mask else 0, 1, lambda v: None) + # create trackbars for color change + cv2.createTrackbar("lower", CalibrationMode.GOAL, lower, 255, lambda v: None) + cv2.createTrackbar("upper", CalibrationMode.GOAL, upper, 255, lambda v: None) + # cv2.createButton("Reset", reset_bounds, (name, lower_rgb, upper_rgb)) + + def set_slider_config(self, config: GoalColorConfig): + [lower, upper] = config.bounds + print(f"Reset config {CalibrationMode.GOAL}", end="\n\n\n") + + cv2.setTrackbarPos('invert_frame', CalibrationMode.GOAL, 1 if config.invert_frame else 0) + cv2.setTrackbarPos('invert_mask', CalibrationMode.GOAL, 1 if config.invert_mask else 0) + + cv2.setTrackbarPos('lower', CalibrationMode.GOAL, lower) + cv2.setTrackbarPos('upper', CalibrationMode.GOAL, upper) + + def get_slider_config(self) -> GoalColorConfig: + # get current positions of four trackbars + invert_frame = cv2.getTrackbarPos('invert_frame', CalibrationMode.GOAL) + invert_mask = cv2.getTrackbarPos('invert_mask', CalibrationMode.GOAL) + + lower = cv2.getTrackbarPos('lower', CalibrationMode.GOAL) + upper = cv2.getTrackbarPos('upper', CalibrationMode.GOAL) + + return GoalColorConfig(bounds=[lower, upper], invert_mask=int2bool(invert_mask), invert_frame=int2bool(invert_frame)) + + def store(self): + filename = "goal.yaml" + c = self.get_slider_config() + [lower, upper] = c + print(f"Store config {filename}" + (" " * 50), end="\n\n") + with open(filename, "w") as f: + yaml.dump({ + "lower": lower, + "upper": upper, + "invert_frame": c.invert_frame, + "invert_mask": c.invert_mask + }, f) + + +class BallColorCalibration(Calibration[BallColorConfig]): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + [lower_hsv, upper_hsv] = self.init_config.bounds + cv2.createTrackbar('invert_frame', CalibrationMode.BALL, 1 if self.init_config.invert_frame else 0, 1, lambda v: None) + cv2.createTrackbar('invert_mask', CalibrationMode.BALL, 1 if self.init_config.invert_mask else 0, 1, lambda v: None) + # create trackbars for color change + cv2.createTrackbar('Hue', CalibrationMode.BALL, avg(lower_hsv[0], upper_hsv[0]), 179, lambda v: None) + cv2.createTrackbar(slider_label('S', 'low'), CalibrationMode.BALL, lower_hsv[1], 255, lambda v: None) + cv2.createTrackbar(slider_label('V', 'low'), CalibrationMode.BALL, lower_hsv[2], 255, lambda v: None) + cv2.createTrackbar(slider_label('S', 'high'), CalibrationMode.BALL, upper_hsv[1], 255, lambda v: None) + cv2.createTrackbar(slider_label('V', 'high'), CalibrationMode.BALL, upper_hsv[2], 255, lambda v: None) + # cv2.createButton("Reset", reset_bounds, (name, lower_rgb, upper_rgb)) + + def set_slider_config(self, config: BallColorConfig): + [lower_hsv, upper_hsv] = config.bounds + print(f"Reset config {CalibrationMode.BALL}", end="\n\n\n") + + cv2.setTrackbarPos('invert_frame', CalibrationMode.BALL, 1 if config.invert_frame else 0) + cv2.setTrackbarPos('invert_mask', CalibrationMode.BALL, 1 if config.invert_mask else 0) + + cv2.setTrackbarPos('Hue', CalibrationMode.BALL, avg(lower_hsv[0], upper_hsv[0])) + cv2.setTrackbarPos(slider_label('S', 'low'), CalibrationMode.BALL, lower_hsv[1]) + cv2.setTrackbarPos(slider_label('V', 'low'), CalibrationMode.BALL, lower_hsv[2]) + cv2.setTrackbarPos(slider_label('S', 'high'), CalibrationMode.BALL, upper_hsv[1]) + cv2.setTrackbarPos(slider_label('V', 'high'), CalibrationMode.BALL, upper_hsv[2]) + + def get_slider_config(self) -> BallColorConfig: + # get current positions of four trackbars + invert_frame = cv2.getTrackbarPos('invert_frame', CalibrationMode.BALL) + invert_mask = cv2.getTrackbarPos('invert_mask', CalibrationMode.BALL) + + hue = cv2.getTrackbarPos('Hue', CalibrationMode.BALL) + hl = max(0, hue - 10) + hh = min(179, hue + 10) + + sl = cv2.getTrackbarPos(slider_label('S', 'low'), CalibrationMode.BALL) + sh = cv2.getTrackbarPos(slider_label('S', 'high'), CalibrationMode.BALL) + + vl = cv2.getTrackbarPos(slider_label('V', 'low'), CalibrationMode.BALL) + vh = cv2.getTrackbarPos(slider_label('V', 'high'), CalibrationMode.BALL) + lower = np.array([hl, sl, vl]) + upper = np.array([hh, sh, vh]) + return BallColorConfig(bounds=[lower, upper], invert_mask=int2bool(invert_mask), invert_frame=int2bool(invert_frame)) + + def store(self): + filename = "ball.yaml" + c = self.get_slider_config() + [lower, upper] = c.bounds + print(f"Store config {filename}" + (" " * 50), end="\n\n") + with open(filename, "w") as f: + yaml.dump({ + "bounds": [lower.tolist(), upper.tolist()], + "invert_frame": c.invert_frame, + "invert_mask": c.invert_mask + }, f) diff --git a/foosball/tracking/__init__.py b/foosball/tracking/__init__.py index 56621c4..dd7aadc 100644 --- a/foosball/tracking/__init__.py +++ b/foosball/tracking/__init__.py @@ -1,17 +1,16 @@ -import logging -import os.path from multiprocessing import Queue import cv2 import numpy as np -from const import GPU, CalibrationMode, BallPresets +from const import GPU, CalibrationMode from .analyze import Analyzer from .preprocess import PreProcessor from .render import Renderer from .tracker import Tracker -from ..detectors.color import BallConfig, GoalConfig -from ..models import Mask, FrameDimensions, rgb2hsv +from ..detectors import BallDetector +from ..detectors.color import GoalDetector +from ..models import Mask, FrameDimensions from ..pipe.Pipe import Pipe @@ -19,40 +18,6 @@ def dim(frame) -> [int, int]: return [frame.shape[1], frame.shape[0]] -def yellow_ball() -> BallConfig: - lower = rgb2hsv(np.array([140, 86, 73])) - upper = rgb2hsv(np.array([0, 255, 94])) - - return BallConfig(bounds=[lower, upper], invert_frame=False, invert_mask=False) - - -def orange_ball() -> BallConfig: - lower = rgb2hsv(np.array([166, 94, 72])) - upper = rgb2hsv(np.array([0, 249, 199])) - - return BallConfig(bounds=[lower, upper], invert_frame=False, invert_mask=False) - - -def get_goal_config() -> GoalConfig: - default_config = GoalConfig(bounds=[0, 235], invert_frame=True, invert_mask=True) - if os.path.isfile('goal.yaml'): - return GoalConfig.load() or default_config - return default_config - - -def get_ball_config(ball: str) -> BallConfig: - match ball: - case BallPresets.YAML: - return BallConfig.load() or yellow_ball() - case BallPresets.ORANGE: - return orange_ball() - case BallPresets.YELLOW: - return yellow_ball() - case _: - logging.error("Unknown ball color. Falling back to 'yellow'") - return yellow_ball() - - def generate_frame_mask(width, height) -> Mask: bar_color = 255 bg = 0 @@ -65,16 +30,16 @@ def generate_frame_mask(width, height) -> Mask: class Tracking: - def __init__(self, stream, dims: FrameDimensions, ball_config: BallConfig, goal_config: GoalConfig, headless=False, maxPipeSize=128, calibrationMode=None, **kwargs): + def __init__(self, stream, dims: FrameDimensions, goal_detector: GoalDetector, ball_detector: BallDetector, headless=False, maxPipeSize=128, calibrationMode=None, **kwargs): super().__init__() self.calibrationMode = calibrationMode width, height = dims.scaled mask = generate_frame_mask(width, height) gpu_flags = kwargs.get(GPU) - self.preprocessor = PreProcessor(dims, goal_config, mask=mask, headless=headless, useGPU='preprocess' in gpu_flags, + self.preprocessor = PreProcessor(dims, goal_detector, mask=mask, headless=headless, useGPU='preprocess' in gpu_flags, calibrationMode=calibrationMode, **kwargs) - self.tracker = Tracker(ball_config, useGPU='tracker' in gpu_flags, calibrationMode=calibrationMode, **kwargs) + self.tracker = Tracker(ball_detector, useGPU='tracker' in gpu_flags, calibrationMode=calibrationMode, **kwargs) self.analyzer = Analyzer(**kwargs) self.renderer = Renderer(dims, headless=headless, useGPU='render' in gpu_flags, **kwargs) diff --git a/foosball/tracking/ai.py b/foosball/tracking/ai.py index d0f6f03..c8559dd 100644 --- a/foosball/tracking/ai.py +++ b/foosball/tracking/ai.py @@ -5,12 +5,13 @@ from imutils.video import FPS from vidgear.gears import WriteGear -from const import HEADLESS, CALIBRATION_MODE, BALL, INFO_VERBOSITY, OUTPUT, CalibrationMode, SCALE -from . import Tracking, get_ball_config, get_goal_config +from const import HEADLESS, CALIBRATION_MODE, INFO_VERBOSITY, OUTPUT, CalibrationMode, SCALE, BALL +from . import Tracking from .render import r_text, BLACK +from ..detectors.color import GoalColorConfig, BallColorDetector, BallColorConfig, GoalColorDetector +from ..models import FrameDimensions, Frame, InfoLog, Verbosity +from ..sink.opencv import DisplaySink, Key, BallColorCalibration, GoalColorCalibration, CalibrationConfig from ..source import Source -from ..sink.opencv import DisplaySink, get_slider_config, add_config_input, reset_config, Key -from ..models import FrameDimensions, Frame, InfoLog BLANKS = (' ' * 80) @@ -26,18 +27,9 @@ def __init__(self, source: Source, dis, *args, **kwargs): self.paused = False self.calibrationMode = kwargs.get(CALIBRATION_MODE) self._stopped = False - self.ball_config = get_ball_config(kwargs.get(BALL)) - self.goals_config = get_goal_config() - self.info_verbosity = kwargs.get(INFO_VERBOSITY) + self.infoVerbosity = Verbosity(kwargs.get(INFO_VERBOSITY)) if kwargs.get(INFO_VERBOSITY) else None self.output = None if kwargs.get(OUTPUT) is None else WriteGear(kwargs.get(OUTPUT), logging=True) - - if self.calibrationMode is not None: - match self.calibrationMode: - case CalibrationMode.BALL: - self.calibration_config = lambda: self.ball_config - case CalibrationMode.GOAL: - self.calibration_config = lambda: self.goals_config self.detection_frame = None original = self.source.dim() @@ -45,29 +37,29 @@ def __init__(self, source: Source, dis, *args, **kwargs): scaled = self.scale_dim(original, self.scale) self.dims = FrameDimensions(original, scaled, self.scale) - self.tracking = Tracking(self.source, self.dims, self.ball_config, self.goals_config, **kwargs) + self.goal_detector = GoalColorDetector(GoalColorConfig.preset()) + self.ball_detector = BallColorDetector(BallColorConfig.preset(kwargs.get(BALL))) + self.tracking = Tracking(self.source, self.dims, self.goal_detector, self.ball_detector, **kwargs) if not self.headless and self.calibrationMode is not None: + # init calibration window self.calibration_display = DisplaySink(self.calibrationMode, pos='br') - # init slider window - add_config_input(self.calibrationMode, self.calibration_config()) + match self.calibrationMode: + case CalibrationMode.BALL: + self.calibration = BallColorCalibration(self.ball_detector.config) + case CalibrationMode.GOAL: + self.calibration = GoalColorCalibration(self.goal_detector.config) + case _: + self.calibration = None self.fps = FPS() - def set_calibration_config(self, config: dict): - if self.calibrationMode is not None: - if self.calibrationMode == 'ball': - self.ball_config = config - else: - self.goals_config = config - def stop(self): self._stopped = True def process_video(self): def reset_calibration(): - if self.calibrationMode is not None: - reset_config(self.calibrationMode, self.calibration_config()) + self.calibration.reset() return False def reset_score(): @@ -75,10 +67,7 @@ def reset_score(): return False def store_calibration(): - if self.calibrationMode is not None: - self.calibration_config().store() - else: - logging.info("calibration not found. config not stored") + self.calibration.store() return False def pause(): @@ -103,7 +92,7 @@ def step_frame(): ord('q'): lambda: True, Key.SPACE.value: pause, ord('s'): store_calibration, - ord('r'): reset_calibration if self.calibrationMode else reset_score, + ord('r'): reset_calibration if self.calibration else reset_score, ord('n'): step_frame } @@ -126,12 +115,14 @@ def step_frame(): self.sink.show(frame) if self.output is not None: self.output.write(frame) - if self.calibrationMode is not None: + if self.calibration is not None: self.render_calibration() if self.sink.render(callbacks=callbacks): break else: - print(f"{info.filter(self.info_verbosity).to_string() if self.info_verbosity is not None else ''} - FPS: {fps} {BLANKS}", end="\r") + print( + f"{info.filter(self.infoVerbosity).to_string() if self.infoVerbosity is not None else ''} - FPS: {fps} {BLANKS}", + end="\r") except Empty: # logger.debug("No new frame") pass @@ -143,7 +134,7 @@ def step_frame(): if not self.headless: self.sink.stop() - if self.calibrationMode is not None: + if self.calibration is not None: self.calibration_display.stop() self.tracking.stop() logging.debug("ai stopped") @@ -157,7 +148,8 @@ def render_fps(frame: Frame, fps: int): color = (0, 255, 127) else: color = (100, 0, 255) - r_text(frame, f"FPS: {frames_per_second}", frame.shape[1], 0, color, background=BLACK, text_scale=0.5, thickness=1, padding=(20, 20), ground_zero='tr') + r_text(frame, f"FPS: {frames_per_second}", frame.shape[1], 0, color, background=BLACK, text_scale=0.5, + thickness=1, padding=(20, 20), ground_zero='tr') def render_calibration(self): try: @@ -168,11 +160,11 @@ def render_calibration(self): def adjust_calibration(self): # see if some sliders changed - if self.calibrationMode in ["goal", "ball"]: - new_config = get_slider_config(self.calibrationMode) - if new_config != self.calibration_config(): - self.set_calibration_config(new_config) - self.tracking.config_input(self.calibration_config()) + if self.calibrationMode in [CalibrationMode.GOAL, CalibrationMode.BALL]: + new_config = self.calibration.get_slider_config() + if new_config != self.calibration.config: + self.calibration.config = new_config + self.tracking.config_input(self.calibration.config) @staticmethod def scale_dim(dim, scale_percent): diff --git a/foosball/tracking/analyze.py b/foosball/tracking/analyze.py index 9606a42..5158077 100644 --- a/foosball/tracking/analyze.py +++ b/foosball/tracking/analyze.py @@ -4,7 +4,7 @@ from .. import hooks from ..hooks import generate_goal_webhook -from ..models import Team, Goals, Score, AnalyzeResult, Track, Info +from ..models import Team, Goals, Score, AnalyzeResult, Track, Info, Verbosity from ..pipe.BaseProcess import BaseProcess, Msg from ..utils import contains @@ -61,7 +61,7 @@ def process(self, msg: Msg) -> Msg: self.logger.error("Error in analyzer ", e) traceback.print_exc() self.last_track = track - info.append(Info(verbosity=2, title="Score", value=self.score.to_string())) + info.append(Info(verbosity=Verbosity.INFO, title="Score", value=self.score.to_string())) return Msg(kwargs={"result": AnalyzeResult(score=self.score, ball=ball, goals=goals, frame=frame, info=info, ball_track=track)}) diff --git a/foosball/tracking/preprocess.py b/foosball/tracking/preprocess.py index 490299e..9a68c0c 100644 --- a/foosball/tracking/preprocess.py +++ b/foosball/tracking/preprocess.py @@ -9,9 +9,9 @@ from const import CalibrationMode, OFF from ..arUcos import calibration from ..arUcos.models import Aruco -from ..detectors.color import GoalDetector, GoalConfig +from ..detectors.color import GoalDetector, GoalColorConfig from ..models import Frame, PreprocessResult, Point, Rect, Blob, Goals, FrameDimensions, ScaleDirection, \ - InfoLog, Info + InfoLog, Info, Verbosity from ..pipe.BaseProcess import BaseProcess, Msg from ..pipe.Pipe import clear from ..utils import ensure_cpu, generate_processor_switches, relative_change, scale @@ -40,7 +40,7 @@ def pad_rect(rectangle: Rect, xpad: int, ypad: int) -> Rect: class PreProcessor(BaseProcess): - def __init__(self, dims: FrameDimensions, goal_config: GoalConfig, headless=True, mask=None, used_markers=None, + def __init__(self, dims: FrameDimensions, goal_detector: GoalDetector, headless=True, mask=None, used_markers=None, redetect_markers_frames: int = 60, aruco_dictionary=cv2.aruco.DICT_4X4_1000, aruco_params=cv2.aruco.DetectorParameters(), xpad: int = 50, ypad: int = 20, goal_change_threshold: float = 0.10, useGPU: bool = False, calibrationMode=None, verbose=False, **kwargs): @@ -66,9 +66,9 @@ def __init__(self, dims: FrameDimensions, goal_config: GoalConfig, headless=True self.goals_calibration = self.calibrationMode == CalibrationMode.GOAL self.calibration_out = Queue() if self.goals_calibration else None self.config_in = Queue() if self.goals_calibration else None - self.goal_detector = GoalDetector(goal_config) + self.goal_detector = goal_detector - def config_input(self, config: GoalConfig) -> None: + def config_input(self, config: GoalColorConfig) -> None: if self.goals_calibration: self.config_in.put_nowait(config) @@ -104,14 +104,14 @@ def process(self, msg: Msg) -> Msg: trigger_marker_detection = self.frames_since_last_marker_detection == 0 or len(self.markers) == 0 goal_info = None - marker_info = Info(verbosity=0, title=f'{"? " if trigger_marker_detection else ""}Markers', value=f'{len(self.markers)}'.ljust(10, ' ')) + marker_info = Info(verbosity=Verbosity.TRACE, title=f'{"? " if trigger_marker_detection else ""}Markers', value=f'{len(self.markers)}'.ljust(10, ' ')) if not self.kwargs.get(OFF): if trigger_marker_detection: # detect markers markers = self.detect_markers(frame) # check if there are exactly 4 markers present markers = [m for m in markers if m.id in self.used_markers] - marker_info = Info(verbosity=0, title=f'{"! " if len(markers) != 4 else ""}Markers', value=f'{len(markers)}'.ljust(10, ' ')) + marker_info = Info(verbosity=Verbosity.TRACE, title=f'{"! " if len(markers) != 4 else ""}Markers', value=f'{len(markers)}'.ljust(10, ' ')) # logging.debug(f"markers {[list(m.id)[0] for m in markers]}") if len(markers) == 4: self.markers = markers @@ -137,9 +137,9 @@ def process(self, msg: Msg) -> Msg: right=self.goals.right if abs(right_change) < self.goal_change_threshold else new_goals.right ) # TODO: distinguish between red or blue goal (instead of left and right) - goal_info = Info(verbosity=1, title='goals', value=f'{"detected" if self.goals is not None else "fail"}') + goal_info = Info(verbosity=Verbosity.DEBUG, title='goals', value=f'{"detected" if self.goals is not None else "fail"}') else: - goal_info = Info(verbosity=1, title='goals', value='none'.rjust(8, ' ')) + goal_info = Info(verbosity=Verbosity.DEBUG, title='goals', value='none'.rjust(8, ' ')) preprocessed = self.mask_frame(frame) info.append(marker_info) if goal_info is not None: diff --git a/foosball/tracking/render.py b/foosball/tracking/render.py index 902b069..21c38a4 100644 --- a/foosball/tracking/render.py +++ b/foosball/tracking/render.py @@ -4,7 +4,8 @@ import cv2 import numpy as np -from ..models import Goal, Score, FrameDimensions, Blob, InfoLog +from const import INFO_VERBOSITY +from ..models import Goal, Score, FrameDimensions, Blob, InfoLog, Verbosity from ..pipe.BaseProcess import Msg, BaseProcess from ..utils import generate_processor_switches logger = logging.getLogger(__name__) @@ -102,11 +103,11 @@ class Renderer(BaseProcess): def close(self): pass - def __init__(self, dims: FrameDimensions, headless=False, useGPU: bool = False, infoVerbosity=None, *args, **kwargs): + def __init__(self, dims: FrameDimensions, headless=False, useGPU: bool = False, *args, **kwargs): super().__init__(name="Renderer") self.dims = dims self.headless = headless - self.infoVerbosity = infoVerbosity + self.infoVerbosity = Verbosity(kwargs.get(INFO_VERBOSITY)) if kwargs.get(INFO_VERBOSITY) else None [self.proc, self.iproc] = generate_processor_switches(useGPU) def process(self, msg: Msg) -> Msg: diff --git a/foosball/tracking/tracker.py b/foosball/tracking/tracker.py index 745ac84..d10bdef 100644 --- a/foosball/tracking/tracker.py +++ b/foosball/tracking/tracker.py @@ -5,8 +5,8 @@ from const import CalibrationMode from .preprocess import WarpMode, project_blob -from ..detectors.color import BallDetector, BallConfig -from ..models import TrackResult, Track, Info, Blob, Goals, InfoLog +from ..detectors.color import BallColorDetector, BallColorConfig +from ..models import TrackResult, Track, Info, Blob, Goals, InfoLog, Verbosity from ..pipe.BaseProcess import BaseProcess, Msg from ..pipe.Pipe import clear from ..utils import generate_processor_switches @@ -19,7 +19,7 @@ def log(result: TrackResult) -> None: class Tracker(BaseProcess): - def __init__(self, ball_config: BallConfig, useGPU: bool = False, buffer=16, off=False, verbose=False, + def __init__(self, ball_detector: BallColorDetector, useGPU: bool = False, buffer=16, off=False, verbose=False, calibrationMode=None, **kwargs): super().__init__(name="Tracker") self.ball_track = Track(maxlen=buffer) @@ -29,13 +29,13 @@ def __init__(self, ball_config: BallConfig, useGPU: bool = False, buffer=16, off [self.proc, self.iproc] = generate_processor_switches(useGPU) # define the lower_ball and upper_ball boundaries of the # ball in the HSV color space, then initialize the - self.ball_calibration = self.calibrationMode == CalibrationMode.BALL - self.ball_detector = BallDetector(ball_config) - self.bounds_in = Queue() if self.ball_calibration else None - self.calibration_out = Queue() if self.ball_calibration else None + self.calibration = self.calibrationMode == CalibrationMode.BALL + self.ball_detector = ball_detector + self.bounds_in = Queue() if self.calibration else None + self.calibration_out = Queue() if self.calibration else None def close(self) -> None: - if self.ball_calibration: + if self.calibration: clear(self.bounds_in) self.bounds_in.close() clear(self.calibration_out) @@ -52,24 +52,24 @@ def update_ball_track(self, detected_ball: Blob) -> Track: def get_info(self, ball_track: Track) -> InfoLog: info = InfoLog(infos=[ - Info(verbosity=1, title="Track length", value=f"{str(sum([1 for p in ball_track or [] if p is not None])).rjust(2, ' ')}"), - Info(verbosity=0, title="Calibration", value=f"{self.calibrationMode if self.calibrationMode is not None else 'off'}"), - Info(verbosity=0, title="Tracker", value=f"{'off' if self.off else 'on'}") + Info(verbosity=Verbosity.DEBUG, title="Track length", value=f"{str(sum([1 for p in ball_track or [] if p is not None])).rjust(2, ' ')}"), + Info(verbosity=Verbosity.TRACE, title="Calibration", value=f"{self.calibrationMode if self.calibrationMode is not None else 'off'}"), + Info(verbosity=Verbosity.TRACE, title="Tracker", value=f"{'off' if self.off else 'on'}") ]) - if self.ball_calibration: + if self.calibration: [lower, upper] = self.ball_detector.config.bounds - info.append(Info(verbosity=0, title="lower", value=f'({",".join(map(str,lower))})')) - info.append(Info(verbosity=0, title="upper", value=f'({",".join(map(str,upper))})')) - info.append(Info(verbosity=0, title="invert frame", value=f'{self.ball_detector.config.invert_frame}')) - info.append(Info(verbosity=0, title="invert mask", value=f'{self.ball_detector.config.invert_mask}')) + info.append(Info(verbosity=Verbosity.TRACE, title="lower", value=f'({",".join(map(str,lower))})')) + info.append(Info(verbosity=Verbosity.TRACE, title="upper", value=f'({",".join(map(str,upper))})')) + info.append(Info(verbosity=Verbosity.TRACE, title="invert frame", value=f'{self.ball_detector.config.invert_frame}')) + info.append(Info(verbosity=Verbosity.TRACE, title="invert mask", value=f'{self.ball_detector.config.invert_mask}')) return info @property def calibration_output(self) -> Queue: return self.calibration_out - def config_input(self, config: BallConfig) -> None: - if self.ball_calibration: + def config_input(self, config: BallColorConfig) -> None: + if self.calibration: self.bounds_in.put_nowait(config) def process(self, msg: Msg) -> Msg: @@ -80,7 +80,7 @@ def process(self, msg: Msg) -> Msg: info = None try: if not self.off: - if self.ball_calibration: + if self.calibration: try: self.ball_detector.config = self.bounds_in.get_nowait() except Empty: @@ -97,7 +97,7 @@ def process(self, msg: Msg) -> Msg: left=project_blob(goals.left, preprocess_result.homography_matrix, WarpMode.DEWARP), right=project_blob(goals.right, preprocess_result.homography_matrix, WarpMode.DEWARP) ) - if self.ball_calibration: + if self.calibration: self.calibration_out.put_nowait(self.iproc(ball_detection_result.frame)) # copy deque, since we otherwise run into odd tracks displayed ball_track = self.update_ball_track(ball).copy() diff --git a/foosball/utils.py b/foosball/utils.py index 7339b20..a0f7142 100644 --- a/foosball/utils.py +++ b/foosball/utils.py @@ -31,6 +31,8 @@ def from_gpu(frame: GPUFrame) -> CPUFrame: def relative_change(old_value, new_value): return (new_value / old_value) - 1 + + def generate_processor_switches(useGPU: bool = False) -> [Callable[[Frame], Frame], Callable[[Frame], Frame]]: if not useGPU: return [lambda x: x, lambda x: x] @@ -39,6 +41,14 @@ def generate_processor_switches(useGPU: bool = False) -> [Callable[[Frame], Fram def ensure_cpu(frame: Frame) -> CPUFrame: - if type(frame) == cv2.UMat: + if isinstance(frame, cv2.UMat): return from_gpu(frame) return frame + + +def avg(x, y): + return int((x + y) / 2) + + +def int2bool(x: int) -> bool: + return True if x == 1 else False