diff --git a/README.md b/README.md index 98527cb..0b00487 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # FrameGrab by Groundlight ## A user-friendly library for grabbing images from cameras or streams -FrameGrab is an open-source Python library designed to make it easy to grab frames (images) from cameras or streams. The library supports webcams, RTSP streams, Basler USB cameras and Intel RealSense depth cameras. +FrameGrab is an open-source Python library designed to make it easy to grab frames (images) from cameras or streams. The library supports generic USB cameras (such as webcams), RTSP streams, Basler USB cameras, Basler GigE cameras, and Intel RealSense depth cameras. FrameGrab also provides basic motion detection functionality. FrameGrab requires Python 3.7 or higher. @@ -20,29 +20,36 @@ To install the FrameGrab library, simply run: pip install framegrab ``` +## Optional Dependencies +To use a Basler USB or GigE camera, you must separately install the `pypylon` package. + +Similarly, to use Intel RealSense cameras, you must install `pyrealsense2`. + +If you don't intend to use these camera types, you don't need to install these extra packages. + ## Usage ### Frame Grabbing -Simple usage with a single webcam would look something like the following: +Simple usage with a single USB camera would look something like the following: ``` from framegrab import FrameGrabber config = { - 'input_type': 'webcam', + 'input_type': 'generic_usb', } grabber = FrameGrabber.create_grabber(config) ``` -`config` can contain many details and settings about your camera, but only `input_type` is required. Available `input_type` options are: `webcam`, `rtsp`, `realsense` and `basler_usb`. +`config` can contain many details and settings about your camera, but only `input_type` is required. Available `input_type` options are: `generic_usb`, `rtsp`, `realsense` and `basler`. -Here's an example of a single webcam configured with several options: +Here's an example of a single USB camera configured with several options: ``` config = { - 'name': 'front door camera', - 'input_type': 'webcam', + 'name': 'Front Door Camera', + 'input_type': 'generic_usb', 'id': { 'serial_number': 23432570 }, @@ -92,7 +99,7 @@ grabber.release() You might have several cameras that you want to use in the same application. In this case, you can load the configurations from a yaml file and use `FrameGrabber.create_grabbers`. -If you have multiple cameras of the same type plugged in, it's recommended to provide serial numbers in the configurations; this ensures that each configuration is paired with the correct camera. If you don't provide serial numbers in your configurations, configurations will be paired with cameras in a sequential manner. +If you have multiple cameras of the same type plugged in, it's recommended that you include serial numbers in the configurations; this ensures that each configuration is paired with the correct camera. If you don't provide serial numbers in your configurations, configurations will be paired with cameras in a sequential manner. Below is a sample yaml file containing configurations for three different cameras. ``` @@ -107,17 +114,17 @@ GL_CAMERAS: | right: .8 - name: conference room input_type: rtsp - address: + id: rtsp_url: rtsp://admin:password@192.168.1.20/cam/realmonitor?channel=1&subtype=0 options: crop: - absolute: + pixels: top: 350 bottom: 1100 left: 1100 right: 2000 - name: workshop - input_type: webcam + input_type: generic_usb id: serial_number: B77D3A8F ``` @@ -138,12 +145,44 @@ grabbers = FrameGrabber.create_grabbers(configs) for grabber in grabbers.values(): print(grabber.config) frame = grabber.grab() - display_image(frame) + display_image(frame) # substitute this line for your preferred method of displaying images, such as cv2.imshow grabber.release() ``` -It is also possible to 'autodiscover' cameras. This will automatically connect to all cameras that are plugged into your machine, such as `webcam`, `realsense` and `basler_usb` cameras. Default configurations will be loaded for each camera. Please note that RTSP streams cannot be discovered in this manner; RTSP URLs must be specified in the configurations. +### Configurations +The table below shows all available configurations and the cameras to which they apply. +| Configuration Name | Example | Webcam | RTSP | Basler | Realsense | +|----------------------------|-----------------|------------|-----------|-----------|-----------| +| name | On Robot Arm | optional | optional | optional | optional | +| input_type | generic_usb | required | required | required | required | +| id.serial_number | 23458234 | optional | - | optional | optional | +| id.rtsp_url | rtsp://… | - | required | - | - | +| options.resolution.height | 480 | optional | - | - | optional | +| options.resolution.width | 640 | optional | - | - | optional | +| options.zoom.digital | 1.3 | optional | optional | optional | optional | +| options.crop.pixels.top | 100 | optional | optional | optional | optional | +| options.crop.pixels.bottom | 400 | optional | optional | optional | optional | +| options.crop.pixels.left | 100 | optional | optional | optional | optional | +| options.crop.pixels.right | 400 | optional | optional | optional | optional | +| options.crop.relative.top | 0.1 | optional | optional | optional | optional | +| options.crop.relative.bottom | 0.9 | optional | optional | optional | optional | +| options.crop.relative.left | 0.1 | optional | optional | optional | optional | +| options.crop.relative.right | 0.9 | optional | optional | optional | optional | +| options.depth.side_by_side | 1 | - | - | - | optional | + +In addition to the configurations in the table above, you can set any Basler camera property by including `options.basler.`. For example, it's common to set `options.basler.PixelFormat` to `RGB8`. + +### Autodiscovery +Autodiscovery automatically connects to all cameras that are plugged into your machine or discoverable on the network, including `generic_usb`, `realsense` and `basler` cameras. Default configurations will be loaded for each camera. Please note that RTSP streams cannot be discovered in this manner; RTSP URLs must be specified in the configurations. + +Autodiscovery is great for simple applications where you don't need to set any special options on your cameras. It's also a convenient method for finding the serial numbers of your cameras (if the serial number isn't printed on the camera). ``` grabbers = FrameGrabber.autodiscover() + +# Print some information about the discovered cameras +for grabber in grabbers.values(): + print(grabber.config) + + grabber.release() ``` ### Motion Detection diff --git a/pyproject.toml b/pyproject.toml index eaa7b07..f527a7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "framegrab" -version = "0.2.1" +version = "0.3" description = "Easily grab frames from cameras or streams" authors = ["Groundlight "] license = "MIT" @@ -8,10 +8,8 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.7" -opencv-python = "^4.7.0.72" -youtube-dl = "^2021.12.17" -pillow = "^9.5.0" -pafy = "^0.5.5" +opencv-python = "^4.4.0.46" +pyyaml = "^6.0.1" [tool.poetry.group.dev.dependencies] black = "^23.3.0" @@ -19,4 +17,4 @@ pytest = "^7.0.1" [build-system] requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" +build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/sample_scripts/autodiscover_demo.py b/sample_scripts/autodiscover_demo.py new file mode 100644 index 0000000..ef5fcb1 --- /dev/null +++ b/sample_scripts/autodiscover_demo.py @@ -0,0 +1,19 @@ +from framegrab import FrameGrabber + +print('Autodiscovering cameras...') + +grabbers = FrameGrabber.autodiscover() + +print('-' * 100) +print(f'Found {len(grabbers)} camera(s): {list(grabbers.keys())}') + +# Get a frame from each camera +for camera_name, grabber in grabbers.items(): + frame = grabber.grab() + + print(f'Grabbed frame from {camera_name} with shape {frame.shape}') + print(grabber.config) + + grabber.release() + +print('Autodiscover demo complete.') \ No newline at end of file diff --git a/sample_scripts/multicamera_demo.py b/sample_scripts/multicamera_demo.py new file mode 100644 index 0000000..4601c73 --- /dev/null +++ b/sample_scripts/multicamera_demo.py @@ -0,0 +1,33 @@ +from framegrab import FrameGrabber +import yaml +import cv2 + +# load the configurations from yaml +config_path = 'sample_config.yaml' +with open(config_path, 'r') as f: + data = yaml.safe_load(f) + configs = yaml.safe_load(data['GL_CAMERAS']) + +print('Loaded the following configurations from yaml:') +print(configs) + +# Create the grabbers +grabbers = FrameGrabber.create_grabbers(configs) + +while True: + # Get a frame from each camera + for camera_name, grabber in grabbers.items(): + frame = grabber.grab() + + cv2.imshow(camera_name, frame) + + key = cv2.waitKey(30) + + if key == ord('q'): + break + +cv2.destroyAllWindows() + +for grabber in grabbers.values(): + grabber.release() + \ No newline at end of file diff --git a/sample_scripts/sample_config.yaml b/sample_scripts/sample_config.yaml new file mode 100644 index 0000000..ac71c49 --- /dev/null +++ b/sample_scripts/sample_config.yaml @@ -0,0 +1,24 @@ +GL_CAMERAS: | + - name: Front Door + input_type: generic_usb + options: + zoom: + digital: 1.5 + - name: Conference Room + input_type: rtsp + id: + rtsp_url: rtsp://admin:password@10.0.0.0/cam/realmonitor?channel=1&subtype=0 + options: + crop: + relative: + top: .1 + bottom: .9 + left: .1 + right: .9 + - name: Workshop + input_type: basler + id: + serial_number: 12345678 + options: + basler: + ExposureTime: 60000 \ No newline at end of file diff --git a/sample_scripts/single_camera_demo.py b/sample_scripts/single_camera_demo.py new file mode 100644 index 0000000..ee6e2b6 --- /dev/null +++ b/sample_scripts/single_camera_demo.py @@ -0,0 +1,26 @@ +"""Finds a single USB camera (or built-in webcam) and displays its feed in a window. +Press 'q' to quit. +""" + +import cv2 +from framegrab import FrameGrabber + +config = { + 'name': 'My Camera', + 'input_type': 'generic_usb', +} + +grabber = FrameGrabber.create_grabber(config) + +while True: + frame = grabber.grab() + + cv2.imshow('FrameGrab Single-Camera Demo', frame) + + key = cv2.waitKey(30) + if key == ord('q'): + break + +cv2.destroyAllWindows() + +grabber.release() \ No newline at end of file diff --git a/src/framegrab/grabber.py b/src/framegrab/grabber.py index 501b08e..b034f28 100644 --- a/src/framegrab/grabber.py +++ b/src/framegrab/grabber.py @@ -10,30 +10,34 @@ import cv2 import numpy as np +from .unavailable_module import UnavailableModule + +logger = logging.getLogger(__name__) + # Optional imports try: from pypylon import pylon -except ImportError: - pylon = None +except ImportError as e: + pylon = UnavailableModule(e) try: from pyrealsense2 import pyrealsense2 as rs -except ImportError: - rs = None - -logger = logging.getLogger(__name__) +except ImportError as e: + rs = UnavailableModule(e) OPERATING_SYSTEM = platform.system() DIGITAL_ZOOM_MAX = 4 +NOISE = np.random.randint(0, 256, (480, 640, 3), dtype=np.uint8) # in case a camera can't get a frame class InputTypes: """Defines the available input types from FrameGrabber objects""" - WEBCAM = "webcam" + GENERIC_USB = "generic_usb" RTSP = "rtsp" REALSENSE = "realsense" - BASLER_USB = "basler_usb" + BASLER = "basler" + MOCK = "mock" def get_options() -> list: """Get a list of the available InputType options""" @@ -87,10 +91,19 @@ def create_grabbers(configs: List[dict]) -> dict: # serial number, and that no other FrameGrabbers claim that camera first. configs.sort(key=lambda config: "serial_number" not in config.get("id", {})) + # Do not allow duplicate camera names + names = [config.get("name", None) for config in configs if config.get("name", None) is not None] + if len(names) != len(set(names)): + raise ValueError( + f"Duplicate camera names were provided in configurations. Please ensure that each camera name is unique. " + f"Provided camera names: {names}" + ) + grabbers = {} for config in configs: grabber = FrameGrabber.create_grabber(config) - grabbers[config["name"]] = grabber + name = grabber.config["name"] + grabbers[name] = grabber return grabbers @@ -107,14 +120,16 @@ def create_grabber(config: dict): raise ValueError(f"No input_type provided. Valid types are {InputTypes.get_options()}") # Based on input_type, create correct type of FrameGrabber - if input_type == InputTypes.WEBCAM: - grabber = WebcamFrameGrabber(config) + if input_type == InputTypes.GENERIC_USB: + grabber = GenericUSBFrameGrabber(config) elif input_type == InputTypes.RTSP: grabber = RTSPFrameGrabber(config) - elif input_type == InputTypes.BASLER_USB: - grabber = BaslerUSBFrameGrabber(config) + elif input_type == InputTypes.BASLER: + grabber = BaslerFrameGrabber(config) elif input_type == InputTypes.REALSENSE: grabber = RealSenseFrameGrabber(config) + elif input_type == InputTypes.MOCK: + grabber = MockFrameGrabber(config) else: raise ValueError( f"The provided input_type ({input_type}) is not valid. Valid types are {InputTypes.get_options()}" @@ -136,13 +151,15 @@ def autodiscover() -> dict: """Autodiscovers cameras and returns a dictionary of FrameGrabber objects""" autodiscoverable_input_types = ( InputTypes.REALSENSE, - InputTypes.WEBCAM, - InputTypes.BASLER_USB, + InputTypes.GENERIC_USB, + InputTypes.BASLER, ) grabbers = {} for input_type in autodiscoverable_input_types: - while True: + for _ in range( + 100 + ): # an arbitrarily high value so that we look for enough cameras, but this never becomes an infinite loop try: config = {"input_type": input_type} grabber = FrameGrabber.create_grabber(config) @@ -165,22 +182,24 @@ def grab(self) -> np.ndarray: pass def _crop(self, frame: np.ndarray) -> np.ndarray: - """Looks at FrameGrabber's options and decides to either crop in an absolute manner (by pixels) or + """Looks at FrameGrabber's options and decides to either crop by pixels or in a relative manner (normalized). Returns a cropped frame. """ - absolute_crop_params = self.config.get("options", {}).get("crop", {}).get("absolute") - if absolute_crop_params: - return self._crop_absolute(frame, absolute_crop_params) + options = self.config.get("options", {}) - relative_crop_params = self.config.get("options", {}).get("crop", {}).get("relative") + relative_crop_params = options.get("crop", {}).get("relative") if relative_crop_params: return self._crop_relative(frame, relative_crop_params) + pixel_crop_params = options.get("crop", {}).get("pixels") + if pixel_crop_params: + return self._crop_pixels(frame, pixel_crop_params) + return frame - def _crop_absolute(self, frame: np.ndarray, crop_params: Dict[str, int]) -> np.ndarray: + def _crop_pixels(self, frame: np.ndarray, crop_params: Dict[str, int]) -> np.ndarray: """Crops the provided frame according to the FrameGrabbers cropping configuration. Crops according to pixels positions. """ @@ -209,13 +228,6 @@ def _digital_zoom(self, frame: np.ndarray) -> np.ndarray: if digital_zoom is None: pass - # TODO this condition for checking digital_zoom should eventually be moved to a - # function that validates the whole dictionary of options, perhaps using pydantic. Will add this later. - elif digital_zoom < 1 or digital_zoom > DIGITAL_ZOOM_MAX: - raise ValueError( - f"Invalid value for digital_zoom ({digital_zoom}). " - f"Digital zoom cannot be greater than {DIGITAL_ZOOM_MAX}." - ) else: top = (frame.shape[0] - frame.shape[0] / digital_zoom) / 2 bottom = frame.shape[0] - top @@ -239,9 +251,48 @@ def _set_cv2_resolution(self) -> None: if height: self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + def apply_options(self, options: dict) -> None: + """Update generic options such as crop and zoom as well as + camera-specific options. + """ + + # Ensure that the user hasn't provided pixel cropping parameters and relative cropping parameters + pixel_crop_params = options.get("crop", {}).get("pixels", {}) + relative_crop_params = options.get("crop", {}).get("relative", {}) + if pixel_crop_params and relative_crop_params: + raise ValueError( + f"Pixel cropping parameters and relative cropping parameters were set for " + f"{self.config['name']}. Pixel cropping and absolute cropping cannot be " + f"used together. Please adjust your configurations to use one or the other." + ) + + # Ensure valid relative cropping parameters (between 0 and 1) + for param_name, param_value in relative_crop_params.items(): + if param_value < 0 or param_value > 1: + camera_name = self.config.get("name", "Unnamed Camera") + raise ValueError( + f"Relative cropping parameter ({param_name}) on {camera_name} is {param_value}, which is invalid. " + f"Relative cropping parameters must be between 0 and 1, where 1 represents the full " + f"width or length of the image. " + ) + + # Validate digital zoom level + digital_zoom = options.get("zoom", {}).get("digital") + if digital_zoom and (digital_zoom < 1 or digital_zoom > DIGITAL_ZOOM_MAX): + raise ValueError( + f"Invalid value for digital_zoom ({digital_zoom}). " + f"Digital zoom must >= 1 and <= {DIGITAL_ZOOM_MAX}." + ) + + # Apply camera specific options + self._apply_camera_specific_options(options) + + # Save the options to the config + self.config["options"] = options + @abstractmethod - def apply_options(options: dict) -> None: - """Update any camera-specific options, such as resolution or exposure""" + def _apply_camera_specific_options(options: dict) -> None: + """Update any camera-specific options, such as resolution, exposure_us, pixel_format, etc.""" pass @abstractmethod @@ -250,9 +301,10 @@ def release() -> None: pass -class WebcamFrameGrabber(FrameGrabber): - """For any generic webcam""" +class GenericUSBFrameGrabber(FrameGrabber): + """For any generic USB camera, such as a webcam""" + # keep track of the cameras that are already in use so that we don't try to connect to them twice indices_in_use = set() def __init__(self, config: dict): @@ -262,53 +314,64 @@ def __init__(self, config: dict): if serial_number and OPERATING_SYSTEM != "Linux": logger.warning( - f"Matching webcams with serial_number is not supported on your operating system, {OPERATING_SYSTEM}. " - "Webcams will be sequentially assigned instead." + f"Matching USB cameras with serial_number is not supported on your operating system, {OPERATING_SYSTEM}. " + "Cameras will be sequentially assigned instead." ) - # Assign camera based on serial number if serial was provided - if serial_number and OPERATING_SYSTEM == "Linux": - found_webcam_devnames = WebcamFrameGrabber._find_webcam_devnames() - for devname, curr_serial_number in found_webcam_devnames.items(): - if curr_serial_number == serial_number: - # Extract the index from the device name, e.g. /dev/video0 -> 0 - # This might only work on Linux, and should be re-evaluated if we add serial number - # recognition for other operating systems - idx = int(re.findall(r"\d+", devname)[-1]) - - if idx in WebcamFrameGrabber.indices_in_use: - raise ValueError( - f"Webcam index {idx} already in use. " - f"Did you use the same serial number ({serial_number}) for two different cameras?" - ) - - capture = cv2.VideoCapture(idx) - if capture.isOpened(): - break # Found a valid capture, no need to look any further + # Find the serial number of connected cameras. Currently only works on Linux. + if OPERATING_SYSTEM == "Linux": + found_cams = GenericUSBFrameGrabber._find_cameras() + else: + found_cams = {} + + # Assign camera based on serial number if 1) serial_number was provided and 2) we know the + # serial numbers of plugged in devices + if serial_number and found_cams: + for found_cam in found_cams: + if serial_number != found_cam["serial_number"]: + continue + + idx = found_cam["idx"] + if idx in GenericUSBFrameGrabber.indices_in_use: + raise ValueError( + f"USB camera index {idx} already in use. " + f"Did you use the same serial number ({serial_number}) for two different cameras?" + ) + + capture = cv2.VideoCapture(idx) + if capture.isOpened(): + break # Found a valid capture, no need to look any further else: raise ValueError( - f"Unable to find webcam with the specified serial_number: {serial_number}. " - "Please ensure that the serial number is correct and that the webcam is plugged in." + f"Unable to find USB camera with the specified serial_number: {serial_number}. " + "Please ensure that the serial number is correct and that the camera is plugged in." ) # If no serial number was provided, just assign the next available camera by index else: - for idx in range(20): - if idx in WebcamFrameGrabber.indices_in_use: - continue # Webcam is already in use, moving on + for idx in range(20): # an arbitrarily high number to make sure we check for enough cams + if idx in GenericUSBFrameGrabber.indices_in_use: + continue # Camera is already in use, moving on capture = cv2.VideoCapture(idx) if capture.isOpened(): break # Found a valid capture, no need to look any further else: - raise ValueError(f"Unable to connect to webcam by index. Is your webcam plugged in?") + raise ValueError(f"Unable to connect to USB camera by index. Is your camera plugged in?") + + # If a serial_number wasn't provided by the user, attempt to find it and add it to the config + if not serial_number: + for found_cam in found_cams: + if idx == found_cam["idx"]: + self.config["id"] = {"serial_number": found_cam["serial_number"]} + break # A valid capture has been found, saving it for later self.capture = capture - # Log the current webcam index as 'in use' to prevent other WebcamFrameGrabbers from stepping on it + # Log the current camera index as 'in use' to prevent other GenericUSBFrameGrabbers from stepping on it self.idx = idx - WebcamFrameGrabber.indices_in_use.add(idx) + GenericUSBFrameGrabber.indices_in_use.add(idx) def grab(self) -> np.ndarray: _, frame = self.capture.read() @@ -318,32 +381,32 @@ def grab(self) -> np.ndarray: def release(self) -> None: self.capture.release() - WebcamFrameGrabber.indices_in_use.remove(self.idx) + GenericUSBFrameGrabber.indices_in_use.remove(self.idx) - def apply_options(self, options: dict) -> None: - self.config["options"] = options + def _apply_camera_specific_options(self, options: dict) -> None: self._set_cv2_resolution() # set the buffer size to 1 to always get the most recent frame self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 1) @staticmethod - def _find_webcam_devnames() -> dict: - """Finds all plugged in webcams and returns a dictionary mapping device names to - to serial numbers. This is useful for connecting the dots between user provided configurations + def _find_cameras() -> list: + """Attempts to finds all USB cameras and returns a list dictionaries, each dictionary containing + information about a camera, including: serial_number, device name, index, etc. + This is useful for connecting the dots between user provided configurations and actual plugged in devices. This function only works on Linux, and was specifically tested on an Nvidia Jetson. """ - # ls /dev/video* returns device paths for all plugged in webcams + # ls /dev/video* returns device paths for all detected cameras command = "ls /dev/video*" process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) stdout, _ = process.communicate() output = stdout.decode("utf-8") devices = output.strip().split("\n") - plugged_in_devices = {} + found_cams = [] for devpath in devices: # ls -l /sys/class/video4linux/video0/device returns a path that points back into the /sys/bus/usb/devices/ # directory where can determine the serial number. @@ -361,27 +424,33 @@ def _find_webcam_devnames() -> dict: stdout, _ = process.communicate() serial_number = stdout.decode("utf-8").strip() + # find the index + idx = int(re.findall(r"\d+", devname)[-1]) + if serial_number: - plugged_in_devices[devpath] = serial_number + found_cams.append( + { + "serial_number": serial_number, + "devname": f"/dev/{devname}", + "idx": idx, + } + ) - return plugged_in_devices + return found_cams class RTSPFrameGrabber(FrameGrabber): - """Grabs the most recent frame from an rtsp stream. The RTSP capture - object has a non-configurable built-in buffer, so just calling - grab would return the oldest frame in the buffer rather than the - latest frame. This class uses a thread to continously drain the - buffer by grabbing and discarding frames and only returning the - latest frame when explicitly requested. - """ + """Realtime Streaming Protocol Cameras""" def __init__(self, config: dict): self.config = config - stream = self.config.get("address", {}).get("rtsp_url") + stream = self.config.get("id", {}).get("rtsp_url") if not stream: - raise ValueError(f"No RTSP URL provided. " "Please add an address attribute to the configuration.") + camera_name = self.config.get("name", "Unnamed RTSP Stream") + raise ValueError( + f"No RTSP URL provided for {camera_name}. Please add an rtsp_url attribute to the configuration." + ) self.capture = cv2.VideoCapture(stream) if not self.capture.isOpened(): @@ -394,11 +463,17 @@ def __init__(self, config: dict): self.run = True self.lock = Lock() + + # The _drain thread needs to periodically wait to avoid overloading the CPU. Ideally this would be done + # at the rate of the RTSP feed's FPS. Unfortunately, OpenCV cannot consistently read the FPS of an RTSP + # feed, so we will assume a high FPS of 60. + self.drain_rate = 1 / 60 + Thread(target=self._drain).start() def grab(self) -> np.ndarray: with self.lock: - ret, frame = self.capture.read() # grab and decode since we want this frame + ret, frame = self.capture.retrieve() # grab and decode since we want this frame if not ret: logger.error(f"Could not read frame from {self.capture}") @@ -413,9 +488,11 @@ def release(self) -> None: with self.lock: self.capture.release() - def apply_options(self, options: dict) -> None: - self.config["options"] = options - self._set_cv2_resolution() + def _apply_camera_specific_options(self, options: dict) -> None: + if options.get("resolution"): + raise ValueError( + f"Resolution was set for {self.config['name']}, but resolution cannot be set for RTSP streams." + ) def _drain(self) -> None: """Repeatedly grabs frames without decoding them. @@ -426,97 +503,109 @@ def _drain(self) -> None: with self.lock: _ = self.capture.grab() + # Sleep with each iteration in order to not hog the CPU + time.sleep(self.drain_rate) + -class BaslerUSBFrameGrabber(FrameGrabber): - """Basler USB Camera""" +class BaslerFrameGrabber(FrameGrabber): + """Basler USB and Basler GigE Cameras""" serial_numbers_in_use = set() def __init__(self, config: dict): - if pylon is None: - raise ImportError( - "Using Basler USB cameras requires the pypylon package, which is not installed on this system. " - "Please install pypylon and try again." - ) - self.config = config + # Basler cameras grab frames in different pixel formats, most of which cannot be displayed directly + # by OpenCV. self.convert will convert them to BGR which can be used by OpenCV + self.converter = pylon.ImageFormatConverter() + self.converter.OutputPixelFormat = pylon.PixelType_BGR8packed + self.converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned + tlf = pylon.TlFactory.GetInstance() devices = tlf.EnumerateDevices() if not devices: - raise ValueError("No Basler USB cameras were found. Is your camera plugged in?") + raise ValueError("No Basler cameras were found. Is your camera connected?") # Attempt to match the provided serial number with a plugged in device. If no serial number was provided, just # pick the first found device that is not currently in use. serial_number = config.get("id", {}).get("serial_number") for device in devices: curr_serial_number = device.GetSerialNumber() - if curr_serial_number in BaslerUSBFrameGrabber.serial_numbers_in_use: + if curr_serial_number in BaslerFrameGrabber.serial_numbers_in_use: continue if serial_number is None or serial_number == curr_serial_number: camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateDevice(device)) camera.Open() - logger.info(f"Connected to Basler USB camera with serial number {curr_serial_number}.") + logger.info(f"Connected to Basler camera with serial number {curr_serial_number}.") break else: raise ValueError( - f"Unable to connect to Basler USB camera with serial number: {serial_number}. " - "Please verify that the camera is plugged in and that the serial number is correct." + f"Unable to connect to Basler camera with serial number: {serial_number}. " + "Please verify that the camera is connected and that the serial number is correct." ) + # In case the serial_number wasn't provided by the user, add it to the config + self.config["id"] = {"serial_number": curr_serial_number} + # A valid camera has been found, remember the serial_number to prevent # other FrameGrabbers from using it self.camera = camera - self.serial_number = curr_serial_number - BaslerUSBFrameGrabber.serial_numbers_in_use.add(self.serial_number) + BaslerFrameGrabber.serial_numbers_in_use.add(self.config["id"]["serial_number"]) def grab(self) -> np.ndarray: with self.camera.GrabOne(2000) as result: if result.GrabSucceeded(): - frame = result.GetArray() - else: - frame = None + # Convert the image to BGR for OpenCV + image = self.converter.Convert(result) + frame = image.GetArray() - frame = self._crop(frame) - frame = self._digital_zoom(frame) + # crop and zoom + frame = self._crop(frame) + frame = self._digital_zoom(frame) + else: + error_info = { + "ErrorCode": result.GetErrorCode(), + "PayloadSize": result.GetPayloadSize(), + "ID": result.GetID(), + "BlockID": result.GetBlockID(), + "Width": result.GetWidth(), + "Height": result.GetHeight(), + "PixelType": result.GetPixelType(), + "ErrorDescription": result.GetErrorDescription(), + } + + error_message = "\n".join(f"{k}: {v}" for k, v in error_info.items()) + + logger.warning( + f"Could not grab a frame from {self.config['name']}\n" + f"{error_message}\n" + f"---------------------------------------------------\n" + ) - frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + frame = NOISE return frame def release(self) -> None: self.camera.Close() - BaslerUSBFrameGrabber.serial_numbers_in_use.remove(self.serial_number) - - def apply_options(self, options: dict) -> None: - self.config["options"] = options - - if self.pixel_format: - self.camera.PixelFormat.SetValue(self.pixel_format) - - if self.exposure: - self.camera.ExposureTime.SetValue(self.exposure) + BaslerFrameGrabber.serial_numbers_in_use.remove(self.config["id"]["serial_number"]) - @property - def pixel_format(self): - return self.config.get("options", {}).get("pixel_format") + def _apply_camera_specific_options(self, options: dict) -> None: + if options.get("resolution"): + raise ValueError("FrameGrab does not support setting resolution on Basler cameras.") - @property - def exposure(self): - return self.config.get("options", {}).get("exposure") + basler_options = options.get("basler", {}) + node_map = self.camera.GetNodeMap() + for property_name, value in basler_options.items(): + node = node_map.GetNode(property_name) + node.SetValue(value) class RealSenseFrameGrabber(FrameGrabber): """Intel RealSense Depth Camera""" def __init__(self, config: dict): - if rs is None: - raise ImportError( - "Using IntelRealSense cameras requires the pyrealsense2 package, which is not installed on this system. " - "Please install pyrealsense2 and try again." - ) - self.config = config ctx = rs.context() @@ -540,41 +629,42 @@ def __init__(self, config: dict): pipeline_profile = pipeline.start(rs_config) break # succesfully connected, breaking out of loop except RuntimeError as e: - # the current camera is not available, moving on to the next + # The current camera is not available, moving on to the next continue - else: raise ValueError( f"Unable to connect to Intel RealSense camera with serial_number: {provided_serial_number}. " "Is the serial number correct? Is the camera plugged in?" ) - # A valid pipeline was found, saving for later + # A valid pipeline was found, save the pipeline and RealSense config for later self.pipeline = pipeline + self.rs_config = rs_config + + # In case the serial_number wasn't provided by the user, add it to the config + self.config["id"] = {"serial_number": curr_serial_number} def grab(self) -> np.ndarray: frames = self.pipeline.wait_for_frames() - depth_frame = frames.get_depth_frame() - color_frame = frames.get_color_frame() - # Convert images to numpy arrays and convet from RGB to BGR - depth_image = np.asanyarray(depth_frame.get_data()) + # Convert color images to numpy arrays and convert from RGB to BGR + color_frame = frames.get_color_frame() color_image = cv2.cvtColor(np.asanyarray(color_frame.get_data()), cv2.COLOR_BGR2RGB) - - depth_image = self._crop(depth_image) - depth_image = self._digital_zoom(depth_image) - color_image = self._crop(color_image) color_image = self._digital_zoom(color_image) - # side by side + # If side_by_side is enabled, get a depth frame and horizontally stack it with color frame display_side_by_side = self.config.get("options", {}).get("depth", {}).get("side_by_side") if display_side_by_side: - return self._side_by_side(depth_image, color_image) + depth_frame = frames.get_depth_frame() + depth_image = np.asanyarray(depth_frame.get_data()) + depth_image = self._crop(depth_image) + depth_image = self._digital_zoom(depth_image) + return self._horizontally_stack(depth_image, color_image) else: return color_image - def _side_by_side(self, depth_image: np.ndarray, color_image: np.ndarray) -> np.ndarray: + def _horizontally_stack(self, depth_image: np.ndarray, color_image: np.ndarray) -> np.ndarray: """Merges color image and depth image into a wider image, all in RGB""" # Apply colormap on depth image (image must be converted to 8-bit per pixel first) @@ -595,26 +685,73 @@ def _side_by_side(self, depth_image: np.ndarray, color_image: np.ndarray) -> np. def release(self) -> None: self.pipeline.stop() - def apply_options(self, options: dict) -> None: - self.config["options"] = options + def _apply_camera_specific_options(self, options: dict) -> None: + # Some special handling for changing the resolution of Intel RealSense cameras + new_width = options.get("resolution", {}).get("width") + new_height = options.get("resolution", {}).get("height") + if (new_width and not new_height) or (not new_width and new_height): + camera_name = self.config.get("name", "Unnamed RealSense Camera") + raise ValueError( + f"Invalid resolution settings for {camera_name}. Please provide both a width and a height." + ) + elif new_width and new_height: + self.pipeline.stop() # pipeline needs to be temporarily stopped in order to change the resolution + self.rs_config.enable_stream(rs.stream.color, new_width, new_height) + self.rs_config.enable_stream(rs.stream.depth, new_width, new_height) + self.pipeline.start(self.rs_config) # Restart the pipeline with the new configuration + else: + # If the user didn't provide a resolution, do nothing + pass -# # TODO create this class -# class GigEFrameGrabber(FrameGrabber): -# """GigE Camera -# """ +class MockFrameGrabber(FrameGrabber): + """A mock camera class for testing purposes""" + + # Represents the serial numbers of the mock cameras that are discoverable + available_serial_numbers = ("123", "456", "789") + + # Keeps track of the available serial numbers so that we don't try to connect to them twice + serial_numbers_in_use = set() + + def __init__(self, config: dict): + self.config = config + + provided_serial_number = self.config.get("id", {}).get("serial_number") + + # Iterate through each detected camera and attempt to match it with the provided camera config + for curr_serial_number in MockFrameGrabber.available_serial_numbers: + if curr_serial_number in MockFrameGrabber.serial_numbers_in_use: + continue # this camera is already in use, moving on to the next + if provided_serial_number is None or curr_serial_number == provided_serial_number: + break # succesfully connected, breaking out of loop + else: + raise ValueError( + f"Unable to connect to MockFrameGrabber with serial_number: {provided_serial_number}. " + f"Is the serial number correct? Available serial numbers are {MockFrameGrabber.available_serial_numbers}" + ) + + MockFrameGrabber.serial_numbers_in_use.add(curr_serial_number) + + # In case the serial_number wasn't provided by the user, add it to the config + self.config["id"] = {"serial_number": curr_serial_number} + + def grab(self) -> np.ndarray: + width = self.config.get("options", {}).get("resolution", {}).get("width", 640) + height = self.config.get("options", {}).get("resolution", {}).get("height", 480) -# def __init__(self, config: dict): -# self.config = config + frame = np.zeros((height, width, 3), dtype=np.uint8) -# def grab(self) -> np.ndarray: -# pass + frame = self._crop(frame) + frame = self._digital_zoom(frame) + + return frame + + def release(self) -> None: + MockFrameGrabber.serial_numbers_in_use.remove(self.config["id"]["serial_number"]) -# def release(self) -> None: -# pass + def _apply_camera_specific_options(self, options: dict) -> None: + pass # no action necessary for mock camera -# def apply_options(self, options: dict) -> None: -# self.config['options'] = options # # TODO update this class to work with the latest updates # import os diff --git a/src/framegrab/unavailable_module.py b/src/framegrab/unavailable_module.py new file mode 100644 index 0000000..388f2b7 --- /dev/null +++ b/src/framegrab/unavailable_module.py @@ -0,0 +1,18 @@ +class UnavailableModule: + """Useful for optional dependencies. If an optional dependency fails to be + imported, create an UnavailableModule instance and use it as a placeholder. + Attempting to do anything with the UnavailableModule instance will raise + the original exception. + + In this way, we don't bother the user about optional dependencies failing + to import until it becomes relevant. + """ + + def __init__(self, e: Exception): + self.e = e # save the original exception for later + + def __getattr__(self, name): + """Raise the original exception when the user tries to do anything with + an instance of this class. + """ + raise self.e diff --git a/test/test_framegrab_with_mock_camera.py b/test/test_framegrab_with_mock_camera.py new file mode 100644 index 0000000..0527fde --- /dev/null +++ b/test/test_framegrab_with_mock_camera.py @@ -0,0 +1,176 @@ +"""A suite of tests that can be run without any physical cameras. +Intended to check basic functionality like cropping, zooming, config validation, etc. +""" + +import unittest +from framegrab.grabber import FrameGrabber + +class TestFrameGrabWithMockCamera(unittest.TestCase): + def test_crop_pixels(self): + """Grab a frame, crop a frame by pixels, and make sure the shape is correct. + """ + config = { + 'name': 'mock_camera', + 'input_type': 'mock', + 'options': { + 'resolution': { + 'width': 640, + 'height': 480, + }, + 'crop': { + 'pixels': { + 'top': 40, + 'bottom': 440, + 'left': 120, + 'right': 520, + } + } + } + } + grabber = FrameGrabber.create_grabber(config) + + frame = grabber.grab() + + grabber.release() + + assert frame.shape == (400, 400, 3) + + def test_crop_relative(self): + """Grab a frame, crop a frame in an relative manner (0 to 1), and make sure the shape is correct. + """ + config = { + 'name': 'mock_camera', + 'input_type': 'mock', + 'options': { + 'resolution': { + 'width': 640, + 'height': 480, + }, + 'crop': { + 'relative': { + 'top': .1, + 'bottom': .9, + 'left': .1, + 'right': .9, + } + } + } + } + grabber = FrameGrabber.create_grabber(config) + + frame = grabber.grab() + + grabber.release() + + assert frame.shape == (384, 512, 3) + + def test_zoom(self): + """Grab a frame, zoom a frame, and make sure the shape is correct. + """ + config = { + 'name': 'mock_camera', + 'input_type': 'mock', + 'options': { + 'resolution': { + 'width': 640, + 'height': 480, + }, + 'zoom': { + 'digital': 2, + } + } + } + grabber = FrameGrabber.create_grabber(config) + + frame = grabber.grab() + + grabber.release() + + assert frame.shape == (240, 320, 3) + + def test_attempt_create_grabber_with_invalid_input_type(self): + config = { + 'input_type': 'some_invalid_camera_type' + } + + with self.assertRaises(ValueError): + FrameGrabber.create_grabber(config) + + def test_create_grabber_without_name(self): + config = { + 'input_type': 'mock' + } + + grabber = FrameGrabber.create_grabber(config) + + # Check that some camera name was added + assert len(grabber.config['name']) > 2 + + grabber.release() + + def test_create_grabber_with_name(self): + user_provided_name = 'my_camera' + + config = { + 'name': user_provided_name, + 'input_type': 'mock', + } + + grabber = FrameGrabber.create_grabber(config) + + assert grabber.config['name'] == user_provided_name + + grabber.release() + + def test_create_grabbers_without_names(self): + configs = [ + {'input_type': 'mock'}, + {'input_type': 'mock'}, + {'input_type': 'mock'}, + ] + + grabbers = FrameGrabber.create_grabbers(configs) + + grabber_names = set([grabber.config['name'] for grabber in grabbers.values()]) + + # Make sure all the grabbers have unique names + assert len(configs) == len(grabber_names) + + for grabber in grabbers.values(): + grabber.release() + + def test_attempt_create_more_grabbers_than_exist(self): + """Try to provide a config with more cameras than are actually plugged in. + The MockFrameGrabber class implicitly only has three cameras that can be discovered. + """ + + # Connect to 3 grabbers, this should be fine + configs = [ + {'input_type': 'mock'}, + {'input_type': 'mock'}, + {'input_type': 'mock'}, + ] + + grabbers = FrameGrabber.create_grabbers(configs) + + # Try to connect to another grabber, this should raise an exception because there are only 3 mock cameras available + try: + FrameGrabber.create_grabber({'input_type': 'mock'}) + self.fail() + except ValueError: + pass + finally: + # release all the grabbers + for grabber in grabbers.values(): + grabber.release() + + def test_attempt_create_grabbers_with_duplicate_names(self): + configs = [ + {'name': 'camera1', 'input_type': 'mock'}, + {'name': 'camera2', 'input_type': 'mock'}, + {'name': 'camera1', 'input_type': 'mock'}, + ] + + # Should raise an exception because camera1 is duplicated + with self.assertRaises(ValueError): + FrameGrabber.create_grabbers(configs)