diff --git a/README.md b/README.md index a27dadc..7cceb01 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ lists the sub-commands, including `autodiscover` and `preview`. Frame Grabbers are defined by a configuration dict which is usually stored as YAML. The configuration combines the camera type, the camera ID, and the camera options. The configuration is passed to the `FrameGrabber.create_grabber` method to create a grabber object. The grabber object can then be used to grab frames from the camera. -`config` can contain many details and settings about your camera, but only `input_type` is required. Available `input_type` options are: `generic_usb`, `rtsp`, `realsense`, `basler`, and `rpi_csi2`. +`config` can contain many details and settings about your camera, but only `input_type` is required. Available `input_type` options are: `generic_usb`, `rtsp`, `realsense`, `basler`, `rpi_csi2`, `hls`, and `youtube_live`. Here's an example of a single USB camera configured with several options: ```python @@ -174,8 +174,8 @@ The table below shows all available configurations and the cameras to which they | input_type | generic_usb | required | required | required | required | required | required | required | | id.serial_number | 23458234 | optional | - | optional | optional | - | - | - | | id.rtsp_url | rtsp://… | - | required | - | - | - | - | - | -| id.hls_url | https://... | - | - | - | - | - | required | - | -| id.youtube_url | https://... | - | - | - | - | - | - | required | +| id.hls_url | https://.../*.m3u8 | - | - | - | - | - | required | - | +| id.youtube_url | https://www.youtube.com/watch?v=... | - | - | - | - | - | - | required | | options.resolution.height | 480 | optional | - | - | optional | - | - | - | | options.resolution.width | 640 | optional | - | - | optional | - | - | - | | options.zoom.digital | 1.3 | optional | optional | optional | optional | optional | optional | optional | @@ -189,7 +189,7 @@ The table below shows all available configurations and the cameras to which they | options.crop.relative.right | 0.9 | optional | optional | optional | optional | optional | optional | optional | | options.depth.side_by_side | 1 | - | - | - | optional | - | - | - | | options.num_90_deg_rotations | 2 | optional | optional | optional | optional | optional | optional | optional | -| options.keep_connection_open | True | - | optional | - | - | - | - | - | +| options.keep_connection_open | True | - | optional | - | - | - | optional | optional | | options.max_fps | 30 | - | optional | - | - | - | - | - | @@ -318,6 +318,8 @@ if frame is None: # For example, display it using cv2.imshow() # For example, save it to a file cv2.imwrite('youtube_frame.jpg', frame) + +grabber.release() ``` ## Contributing diff --git a/src/framegrab/grabber.py b/src/framegrab/grabber.py index 27c5f80..c5b9ee5 100644 --- a/src/framegrab/grabber.py +++ b/src/framegrab/grabber.py @@ -46,7 +46,9 @@ OPERATING_SYSTEM = platform.system() DIGITAL_ZOOM_MAX = 4 -NOISE = np.random.randint(0, 256, (480, 640, 3), dtype=np.uint8) # in case a camera can't get a frame +NOISE = np.random.randint( + 0, 256, (480, 640, 3), dtype=np.uint8 +) # in case a camera can't get a frame class InputTypes: @@ -84,7 +86,9 @@ def _validate_config(config: dict) -> dict: # Ensure that serial numbers are strings try: - output_config["id"]["serial_number"] = str(output_config["id"]["serial_number"]) + output_config["id"]["serial_number"] = str( + output_config["id"]["serial_number"] + ) except KeyError: pass @@ -95,7 +99,9 @@ def _validate_config(config: dict) -> dict: return output_config @staticmethod - def create_grabbers(configs: List[dict], warmup_delay: float = 1.0) -> Dict[str, "FrameGrabber"]: + def create_grabbers( + configs: List[dict], warmup_delay: float = 1.0 + ) -> Dict[str, "FrameGrabber"]: """ Creates multiple FrameGrab objects based on user-provided configurations @@ -119,7 +125,11 @@ def create_grabbers(configs: List[dict], warmup_delay: float = 1.0) -> Dict[str, configs.sort(key=lambda config: "serial_number" not in config.get("id", {})) # Do not allow duplicate camera names - names = [config.get("name", None) for config in configs if config.get("name", None) is not None] + names = [ + config.get("name", None) + for config in configs + if config.get("name", None) is not None + ] if len(names) != len(set(names)): raise ValueError( f"Duplicate camera names were provided in configurations. Please ensure that each camera name is unique. " @@ -129,13 +139,17 @@ def create_grabbers(configs: List[dict], warmup_delay: float = 1.0) -> Dict[str, # Create the grabbers grabber_list = [] for config in configs: - grabber = FrameGrabber.create_grabber(config, autogenerate_name=False, warmup_delay=0) + grabber = FrameGrabber.create_grabber( + config, autogenerate_name=False, warmup_delay=0 + ) grabber_list.append(grabber) grabbers = FrameGrabber.grabbers_to_dict(grabber_list) # Do the warmup delay if necessary - grabber_types = set([grabber.config["input_type"] for grabber in grabbers.values()]) + grabber_types = set( + [grabber.config["input_type"] for grabber in grabbers.values()] + ) if InputTypes.GENERIC_USB in grabber_types and warmup_delay > 0: logger.info( f"Waiting {warmup_delay} seconds for camera(s) to warm up. " @@ -146,7 +160,9 @@ def create_grabbers(configs: List[dict], warmup_delay: float = 1.0) -> Dict[str, return grabbers @staticmethod - def from_yaml(filename: Optional[str] = None, yaml_str: Optional[str] = None) -> List["FrameGrabber"]: + def from_yaml( + filename: Optional[str] = None, yaml_str: Optional[str] = None + ) -> List["FrameGrabber"]: """Creates multiple FrameGrabber objects based on a YAML file or YAML string. Args: @@ -167,7 +183,9 @@ def from_yaml(filename: Optional[str] = None, yaml_str: Optional[str] = None) -> yaml_str = f.read() full_config = yaml.safe_load(yaml_str) if "image_sources" not in full_config: - raise ValueError("Invalid config file. Camera configs must be under the 'image_sources' key.") + raise ValueError( + "Invalid config file. Camera configs must be under the 'image_sources' key." + ) image_sources = full_config["image_sources"] # Check that it's a list. if image_sources is None or not isinstance(image_sources, list): @@ -205,7 +223,9 @@ def grabbers_to_dict(grabber_list: list) -> dict: return grabbers @staticmethod - def create_grabber_yaml(yaml_config: str, autogenerate_name: bool = True, warmup_delay: float = 1.0): + def create_grabber_yaml( + yaml_config: str, autogenerate_name: bool = True, warmup_delay: float = 1.0 + ): """Create a FrameGrabber object based on the provided configuration. Parameters: @@ -230,7 +250,9 @@ def create_grabber_yaml(yaml_config: str, autogenerate_name: bool = True, warmup return grabber @staticmethod - def create_grabber(config: dict, autogenerate_name: bool = True, warmup_delay: float = 1.0): + def create_grabber( + config: dict, autogenerate_name: bool = True, warmup_delay: float = 1.0 + ): """Create a FrameGrabber object based on the provided configuration. Parameters: @@ -258,7 +280,9 @@ def create_grabber(config: dict, autogenerate_name: bool = True, warmup_delay: f # At a minimum, input_type must be provided input_type = config.get("input_type", None) if input_type is None: - raise ValueError(f"No input_type provided. Valid types are {InputTypes.get_options()}") + raise ValueError( + f"No input_type provided. Valid types are {InputTypes.get_options()}" + ) # Based on input_type, create correct type of FrameGrabber if input_type == InputTypes.GENERIC_USB: @@ -337,7 +361,9 @@ def autodiscover( # If the input type is RTSP and rtsp_discover_modes is provided, use RTSPDiscovery to find the cameras if input_type == InputTypes.RTSP: if rtsp_discover_mode is not None: - onvif_devices = RTSPDiscovery.discover_onvif_devices(auto_discover_mode=rtsp_discover_mode) + onvif_devices = RTSPDiscovery.discover_onvif_devices( + auto_discover_mode=rtsp_discover_mode + ) for device in onvif_devices: for index, rtsp_url in enumerate(device.rtsp_urls): grabber = FrameGrabber.create_grabber( @@ -357,7 +383,9 @@ def autodiscover( ): # an arbitrarily high value so that we look for enough cameras, but this never becomes an infinite loop try: config = {"input_type": input_type} - grabber = FrameGrabber.create_grabber(config, autogenerate_name=False, warmup_delay=0) + grabber = FrameGrabber.create_grabber( + config, autogenerate_name=False, warmup_delay=0 + ) grabber_list.append(grabber) except (ValueError, ImportError): # ValueError is taken to mean that we have reached the end of enumeration for the current input_type. @@ -368,7 +396,9 @@ def autodiscover( grabbers = FrameGrabber.grabbers_to_dict(grabber_list) # Do the warmup delay if necessary - grabber_types = set([grabber.config["input_type"] for grabber in grabbers.values()]) + grabber_types = set( + [grabber.config["input_type"] for grabber in grabbers.values()] + ) if InputTypes.GENERIC_USB in grabber_types and warmup_delay > 0: logger.info( f"Waiting {warmup_delay} seconds for camera(s) to warm up. " @@ -390,7 +420,9 @@ def grab(self) -> np.ndarray: frame = self._grab_implementation() if frame is None: - name = self.config["name"] # all grabbers should have a name, either user-provided or generated + name = self.config[ + "name" + ] # all grabbers should have a name, either user-provided or generated error_msg = f"Failed to grab frame from {name}" raise GrabError(error_msg) @@ -442,7 +474,9 @@ def _crop(self, frame: np.ndarray) -> np.ndarray: return frame - def _crop_pixels(self, frame: np.ndarray, crop_params: Dict[str, int]) -> np.ndarray: + def _crop_pixels( + self, frame: np.ndarray, crop_params: Dict[str, int] + ) -> np.ndarray: """Crops the provided frame according to the FrameGrabbers cropping configuration. Crops according to pixels positions. """ @@ -454,7 +488,9 @@ def _crop_pixels(self, frame: np.ndarray, crop_params: Dict[str, int]) -> np.nda return frame - def _crop_relative(self, frame: np.ndarray, crop_params: Dict[str, float]) -> np.ndarray: + def _crop_relative( + self, frame: np.ndarray, crop_params: Dict[str, float] + ) -> np.ndarray: """Crops the provided frame according to the FrameGrabbers cropping configuration. Crops according to relative positions (0-1). """ @@ -483,7 +519,9 @@ def _digital_zoom(self, frame: np.ndarray) -> np.ndarray: def _rotate(self, frame: np.ndarray) -> np.ndarray: """Rotates the provided frame a specified number of 90 degree rotations clockwise""" - num_90_deg_rotations = self.config.get("options", {}).get("num_90_deg_rotations", 0) + num_90_deg_rotations = self.config.get("options", {}).get( + "num_90_deg_rotations", 0 + ) for n in range(num_90_deg_rotations): frame = np.rot90(frame) @@ -592,7 +630,9 @@ def __init__(self, config: dict): # Assign camera based on serial number if 1) serial_number was provided and 2) we know the # serial numbers of plugged in devices if found_cams: - logger.debug(f"Found {len(found_cams)} USB cameras with Linux commands. Assigning camera by serial number.") + logger.debug( + f"Found {len(found_cams)} USB cameras with Linux commands. Assigning camera by serial number." + ) for found_cam in found_cams: if serial_number and serial_number != found_cam["serial_number"]: continue @@ -612,8 +652,12 @@ def __init__(self, config: dict): ) # If we don't know the serial numbers of the cameras, just assign the next available camera by index else: - logger.debug("No USB cameras found with Linux commands. Assigning camera by index.") - for idx in range(20): # an arbitrarily high number to make sure we check for enough cams + logger.debug( + "No USB cameras found with Linux commands. Assigning camera by index." + ) + for idx in range( + 20 + ): # an arbitrarily high number to make sure we check for enough cams if idx in GenericUSBFrameGrabber.indices_in_use: continue # Camera is already in use, moving on @@ -622,7 +666,9 @@ def __init__(self, config: dict): break # Found a valid capture else: - raise ValueError("Unable to connect to USB camera by index. Is your camera plugged in?") + raise ValueError( + "Unable to connect to USB camera by index. Is your camera plugged in?" + ) # If a serial_number wasn't provided by the user, attempt to find it and add it to the config if not serial_number: @@ -645,13 +691,17 @@ def _has_ir_camera(self, camera_name: str) -> bool: such as Windows Hello. These cameras are not suitable for use in the context of most applications, so we will exclude them. """ - cameras_with_ir = ["logitech brio"] # we can add to this list as we discover more cameras with IR + cameras_with_ir = [ + "logitech brio" + ] # we can add to this list as we discover more cameras with IR for i in cameras_with_ir: if i in camera_name.lower(): return True return False - def _connect_and_validate_capture(self, camera_details: Dict[str, str]) -> Union[cv2.VideoCapture, None]: + def _connect_and_validate_capture( + self, camera_details: Dict[str, str] + ) -> Union[cv2.VideoCapture, None]: """Connect to the camera, check that it is open and not an IR camera. Return the camera if it is valid, otherwise return None. @@ -713,7 +763,9 @@ def _apply_camera_specific_options(self, options: dict) -> None: @staticmethod def _run_system_command(command: str) -> str: """Runs a Linux system command and returns the stdout as a string.""" - process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True + ) stdout, _ = process.communicate() return stdout.decode("utf-8").strip() @@ -791,7 +843,9 @@ def __init__(self, config: dict): self.lock = Lock() self.run = True - self.keep_connection_open = config.get("options", {}).get("keep_connection_open", True) + self.keep_connection_open = config.get("options", {}).get( + "keep_connection_open", True + ) if self.keep_connection_open: self._open_connection() @@ -815,14 +869,20 @@ def _substitute_rtsp_password(config: dict) -> dict: matches = re.findall(pattern, rtsp_url) if len(matches) == 0: - return config # make no change to config if no password placeholder is found + return ( + config # make no change to config if no password placeholder is found + ) elif len(matches) > 1: - raise ValueError("RTSP URL should contain no more than one placeholder for the password.") + raise ValueError( + "RTSP URL should contain no more than one placeholder for the password." + ) match = matches[0] password_env_var = os.environ.get(match) if not password_env_var: - raise ValueError(f"RTSP URL {rtsp_url} references environment variable {match} which is not set") + raise ValueError( + f"RTSP URL {rtsp_url} references environment variable {match} which is not set" + ) placeholder = "{{" + match + "}}" rtsp_url = rtsp_url.replace(placeholder, password_env_var) @@ -833,7 +893,9 @@ def _substitute_rtsp_password(config: dict) -> dict: def _apply_camera_specific_options(self, options: dict) -> None: if options.get("resolution"): camera_name = self.config.get("name", "Unnamed RTSP Stream") - raise ValueError(f"Resolution was set for {camera_name}, but resolution cannot be set for RTSP streams.") + raise ValueError( + f"Resolution was set for {camera_name}, but resolution cannot be set for RTSP streams." + ) def _open_connection(self): self.capture = cv2.VideoCapture(self.rtsp_url) @@ -841,7 +903,9 @@ def _open_connection(self): raise ValueError( f"Could not open RTSP stream: {self.rtsp_url}. Is the RTSP URL correct? Is the camera connected to the network?" ) - logger.debug(f"Initialized video capture with backend={self.capture.getBackendName()}") + logger.debug( + f"Initialized video capture with backend={self.capture.getBackendName()}" + ) def _close_connection(self): with self.lock: @@ -860,7 +924,11 @@ def _grab_implementation(self) -> np.ndarray: def _grab_open(self) -> np.ndarray: with self.lock: - ret, frame = self.capture.retrieve() if self.keep_connection_open else self.capture.read() + ret, frame = ( + self.capture.retrieve() + if self.keep_connection_open + else self.capture.read() + ) if not ret: logger.error(f"Could not read frame from {self.capture}") return frame @@ -915,9 +983,13 @@ def __init__(self, config: dict): if curr_serial_number in BaslerFrameGrabber.serial_numbers_in_use: continue if serial_number is None or serial_number == curr_serial_number: - camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateDevice(device)) + camera = pylon.InstantCamera( + pylon.TlFactory.GetInstance().CreateDevice(device) + ) camera.Open() - logger.info(f"Connected to Basler camera with serial number {curr_serial_number}.") + logger.info( + f"Connected to Basler camera with serial number {curr_serial_number}." + ) break else: raise ValueError( @@ -965,12 +1037,16 @@ def _grab_implementation(self) -> np.ndarray: return frame def release(self) -> None: - BaslerFrameGrabber.serial_numbers_in_use.remove(self.config["id"]["serial_number"]) + BaslerFrameGrabber.serial_numbers_in_use.remove( + self.config["id"]["serial_number"] + ) self.camera.Close() def _apply_camera_specific_options(self, options: dict) -> None: if options.get("resolution"): - raise ValueError("FrameGrab does not support setting resolution on Basler cameras.") + raise ValueError( + "FrameGrab does not support setting resolution on Basler cameras." + ) basler_options = options.get("basler", {}) node_map = self.camera.GetNodeMap() @@ -987,7 +1063,9 @@ def __init__(self, config: dict): ctx = rs.context() if len(ctx.devices) == 0: - raise ValueError("No Intel RealSense cameras detected. Is your camera plugged in?") + raise ValueError( + "No Intel RealSense cameras detected. Is your camera plugged in?" + ) provided_serial_number = self.config.get("id", {}).get("serial_number") @@ -998,7 +1076,10 @@ def __init__(self, config: dict): curr_serial_number = device.get_info(rs.camera_info.serial_number) - if provided_serial_number is None or curr_serial_number == provided_serial_number: + if ( + provided_serial_number is None + or curr_serial_number == provided_serial_number + ): rs_config.enable_device(curr_serial_number) # Try to connect to the camera @@ -1026,10 +1107,14 @@ def _grab_implementation(self) -> np.ndarray: # Convert color images to numpy arrays and convert from RGB to BGR color_frame = frames.get_color_frame() - color_image = cv2.cvtColor(np.asanyarray(color_frame.get_data()), cv2.COLOR_BGR2RGB) + color_image = cv2.cvtColor( + np.asanyarray(color_frame.get_data()), cv2.COLOR_BGR2RGB + ) # If side_by_side is enabled, get a depth frame and horizontally stack it with color frame - display_side_by_side = self.config.get("options", {}).get("depth", {}).get("side_by_side") + display_side_by_side = ( + self.config.get("options", {}).get("depth", {}).get("side_by_side") + ) if display_side_by_side: depth_frame = frames.get_depth_frame() depth_image = np.asanyarray(depth_frame.get_data()) @@ -1037,11 +1122,15 @@ def _grab_implementation(self) -> np.ndarray: else: return color_image - def _horizontally_stack(self, depth_image: np.ndarray, color_image: np.ndarray) -> np.ndarray: + def _horizontally_stack( + self, depth_image: np.ndarray, color_image: np.ndarray + ) -> np.ndarray: """Merges color image and depth image into a wider image, all in RGB""" # Apply colormap on depth image (image must be converted to 8-bit per pixel first) - depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_BONE) + depth_colormap = cv2.applyColorMap( + cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_BONE + ) depth_colormap_dim = depth_colormap.shape color_colormap_dim = color_image.shape @@ -1073,7 +1162,9 @@ def _apply_camera_specific_options(self, options: dict) -> None: self.pipeline.stop() # pipeline needs to be temporarily stopped in order to change the resolution self.rs_config.enable_stream(rs.stream.color, new_width, new_height) self.rs_config.enable_stream(rs.stream.depth, new_width, new_height) - self.pipeline.start(self.rs_config) # Restart the pipeline with the new configuration + self.pipeline.start( + self.rs_config + ) # Restart the pipeline with the new configuration else: # If the user didn't provide a resolution, do nothing pass @@ -1100,7 +1191,9 @@ def __init__(self, config: dict): picam2 = Picamera2() picam2.configure(picam2.create_still_configuration()) picam2.start() - logger.info(f"Connected to Raspberry Pi CSI2 camera with id {(cameras[0])['Id']}") + logger.info( + f"Connected to Raspberry Pi CSI2 camera with id {(cameras[0])['Id']}" + ) self.camera = picam2 @@ -1114,7 +1207,9 @@ def _grab_implementation(self) -> np.ndarray: def _apply_camera_specific_options(self, options: dict) -> None: if options.get("resolution"): - raise ValueError("FrameGrab does not support setting resolution on Raspberry Pi CSI2 cameras.") + raise ValueError( + "FrameGrab does not support setting resolution on Raspberry Pi CSI2 cameras." + ) def release(self) -> None: self.camera.close() @@ -1123,9 +1218,10 @@ def release(self) -> None: class HttpLiveStreamingFrameGrabber(FrameGrabber): """Handles Http Live Streaming (HLS) - Opens and closes the connection on every captured frame, which conserves - both CPU and network bandwidth but has higher latency. In practice, roughly - ~0.5 FPS is achievable with this strategy. + Supports two modes: + 1. Keep connection open (default): Opens the connection once and keeps it open for high-fps frame grabbing. + 2. Open connection on every frame: Opens and closes the connection on every captured frame, which conserves + both CPU and network bandwidth but has higher latency. In practice, roughly 1FPS is achievable with this strategy. """ def __init__(self, config: dict): @@ -1141,6 +1237,12 @@ def __init__(self, config: dict): self.hls_url = self.config["id"]["hls_url"] self.lock = Lock() + self.keep_connection_open = config.get("options", {}).get( + "keep_connection_open", True + ) + + if self.keep_connection_open: + self._open_connection() def _apply_camera_specific_options(self, options: dict) -> None: if options.get("resolution"): @@ -1152,20 +1254,28 @@ def _apply_camera_specific_options(self, options: dict) -> None: def _open_connection(self): self.capture = cv2.VideoCapture(self.hls_url) if not self.capture.isOpened(): - raise ValueError(f"Could not open {self.type} stream: {self.hls_url}. Is the HLS URL correct?") - logger.debug(f"Initialized video capture with backend={self.capture.getBackendName()}") + raise ValueError( + f"Could not open {self.type} stream: {self.hls_url}. Is the HLS URL correct?" + ) + logger.warning( + f"Initialized video capture with backend={self.capture.getBackendName()}" + ) def _close_connection(self): + logger.warning(f"Closing connection to {self.type} stream") with self.lock: if self.capture is not None: self.capture.release() def _grab_implementation(self) -> np.ndarray: - self._open_connection() - try: + if not self.keep_connection_open: + self._open_connection() + try: + return self._grab_open() + finally: + self._close_connection() + else: return self._grab_open() - finally: - self._close_connection() def _grab_open(self) -> np.ndarray: with self.lock: @@ -1175,15 +1285,17 @@ def _grab_open(self) -> np.ndarray: return frame def release(self) -> None: - pass + if self.keep_connection_open: + self._close_connection() class YouTubeLiveFrameGrabber(HttpLiveStreamingFrameGrabber): """Grabs the most recent frame from a YouTube Live stream (which are HLS streams under the hood) - Opens and closes the connection on every captured frame, which conserves - both CPU and network bandwidth but has higher latency. In practice, roughly - ~0.5 FPS is achievable with this strategy. + Supports two modes: + 1. Keep connection open (default): Opens the connection once and keeps it open for high-fps frame grabbing. + 2. Open connection on every frame: Opens and closes the connection on every captured frame, which conserves + both CPU and network bandwidth but has higher latency. In practice, roughly 1FPS is achievable with this strategy. """ def __init__(self, config: dict): @@ -1199,12 +1311,20 @@ def __init__(self, config: dict): self.config = config self.lock = Lock() + self.keep_connection_open = config.get("options", {}).get( + "keep_connection_open", True + ) + + if self.keep_connection_open: + self._open_connection() def _extract_hls_url(self, youtube_url: str) -> str: """Extracts the HLS URL from a YouTube Live URL.""" available_streams = streamlink.streams(youtube_url) if "best" not in available_streams: - raise ValueError(f"No available HLS stream for {youtube_url=}\n{available_streams=}") + raise ValueError( + f"No available HLS stream for {youtube_url=}\n{available_streams=}" + ) return available_streams["best"].url @@ -1226,7 +1346,10 @@ def __init__(self, config: dict): for curr_serial_number in MockFrameGrabber.available_serial_numbers: if curr_serial_number in MockFrameGrabber.serial_numbers_in_use: continue # this camera is already in use, moving on to the next - if provided_serial_number is None or curr_serial_number == provided_serial_number: + if ( + provided_serial_number is None + or curr_serial_number == provided_serial_number + ): break # succesfully connected, breaking out of loop else: raise ValueError( @@ -1246,7 +1369,9 @@ def _grab_implementation(self) -> np.ndarray: return np.zeros((height, width, 3), dtype=np.uint8) def release(self) -> None: - MockFrameGrabber.serial_numbers_in_use.remove(self.config["id"]["serial_number"]) + MockFrameGrabber.serial_numbers_in_use.remove( + self.config["id"]["serial_number"] + ) def _apply_camera_specific_options(self, options: dict) -> None: pass # no action necessary for mock cameras