diff --git a/cereja/__init__.py b/cereja/__init__.py index fad3dba..fc86e33 100644 --- a/cereja/__init__.py +++ b/cereja/__init__.py @@ -48,7 +48,7 @@ from ._requests import request from . import scraping -VERSION = "2.0.0.final.1" +VERSION = "2.0.1.final.0" __version__ = get_version_pep440_compliant(VERSION) diff --git a/cereja/array/_array.py b/cereja/array/_array.py index e0e9cb7..7bcbd5d 100644 --- a/cereja/array/_array.py +++ b/cereja/array/_array.py @@ -50,6 +50,8 @@ "sub", "prod", "reshape", + "get_min_max", + "apply_proportional_mask", ] from ..utils import is_iterable, is_sequence, is_numeric_sequence, chunk, dict_to_tuple @@ -480,6 +482,77 @@ def dot(a, b): return [[dotproduct(line, col) for col in get_cols(b)] for line in a] +def get_min_max(values: List[Any]) -> Tuple[Any, ...]: + if not values or len(values) == 0: + raise ValueError("values must have at least one element") + + result = [] + try: + shape = get_shape(values) + if len(shape) > 1: + values_ = flatten(values, depth=len(shape) - 2) + for i in range(shape[-1]): + result.append((min(values_, key=lambda val: val[i])[i], max(values_, key=lambda val: val[i])[i])) + return tuple(result) + return min(values), max(values) + except Exception as err: + raise ValueError(f"Error when trying to get min and max values. {err}") + + +def apply_proportional_mask( + positions: List[Tuple[int, int]], + mask_size: Tuple[int, int], + original_size: Optional[Tuple[int, int]] = None +) -> List[List[int]]: + """ + Applies a proportional mask to a list of positions and returns an array (list of lists) representing the mask, + where each index of the main list corresponds to a row. + + Args: + positions (List[Tuple[int, int]]): List of tuples representing positions (x, y) on the original scale. + mask_size (Tuple[int, int]): Mask size (columns, rows), where columns is the number of columns and rows is + the number of rows. + original_size (Optional[Tuple[int, int]]): Original image size (width, height). If not provided, it will be + calculated based on positions. + + Returns: + List[List[int]]: Matrix resulting from the proportional application of the mask, where each index represents a row. + """ + + # If original_size is not given, calculate based on positions + min_x, max_x, min_y, max_y = 0, 0, 0, 0 + if original_size is None: + (min_x, max_x), (min_y, max_y) = get_min_max(positions) + original_width = max_x - min_x + original_height = max_y - min_y + else: + original_width, original_height = original_size + + mask_cols, mask_rows = mask_size + + # Initialize the array with zeros + matrix = [[0 for _ in range(mask_cols)] for _ in range(mask_rows)] + + # Calculate scale factors for columns and rows + scale_x = mask_cols / original_width + scale_y = mask_rows / original_height + + # Fill the matrix based on the scaled positions + for x, y in positions: + # Adjust position to new coordinate system if necessary + if original_size is None: + x -= min_x + y -= min_y + + scaled_x = int(x * scale_x) + scaled_y = int(y * scale_y) + + if 0 <= scaled_x < mask_cols and 0 <= scaled_y < mask_rows: + matrix[scaled_y][scaled_x] = 1 + + return matrix + + class Matrix(object): """ Matrix is ​​a tool similar to the numpy array diff --git a/cereja/concurrently/__init__.py b/cereja/concurrently/__init__.py index 6d7a44a..bd07c03 100644 --- a/cereja/concurrently/__init__.py +++ b/cereja/concurrently/__init__.py @@ -20,4 +20,4 @@ SOFTWARE. """ from ._concurrence import TaskList, sync_to_async, async_to_sync -from .process import MultiProcess +from .process import MultiProcess, Processor diff --git a/cereja/concurrently/process.py b/cereja/concurrently/process.py index 7cb2b75..8b763b0 100644 --- a/cereja/concurrently/process.py +++ b/cereja/concurrently/process.py @@ -325,8 +325,12 @@ def process(self, func, data, *args, **kwargs): if elapsed_time < self.interval_seconds: time.sleep(self.interval_seconds - elapsed_time) if self.in_progress_count >= self._max_in_progress: - print(f"O Total de dados sendo processado {self.in_progress_count} é maior que o predefinido {self._max_in_progress}") - time.sleep(10) + print( + f"O Total de dados sendo processado {self.in_progress_count} é maior que o predefinido {self._max_in_progress}") + while self.in_progress_count >= self._max_in_progress * 0.9: + time.sleep(0.05) + if self.stopped: + break self.stop_process() @@ -345,4 +349,4 @@ def restart_process(self): self._started_at = time.time() self._failure_data = [] self._total_success = 0 - self._create_process_result_service().start() \ No newline at end of file + self._create_process_result_service().start() diff --git a/cereja/display/_display.py b/cereja/display/_display.py index 4277b58..f5d38e9 100644 --- a/cereja/display/_display.py +++ b/cereja/display/_display.py @@ -208,6 +208,7 @@ def restore_sys_module_state(self): class _ConsoleBase(metaclass=ABC): + _instance = None NON_BMP_MAP = dict.fromkeys(range(0x10000, sys.maxunicode + 1), 0xFFFD) DONE_UNICODE = "\U00002705" ERROR_UNICODE = "\U0000274C" @@ -251,6 +252,11 @@ def __init__(self, color_text: str = "default"): self.text_color = color_text self._stdout = _Stdout(self) + def __new__(cls): + if cls._instance is None: + cls._instance = super(_ConsoleBase, cls).__new__(cls) + return cls._instance + @property def non_bmp_supported(self): from cereja import NON_BMP_SUPPORTED @@ -718,7 +724,7 @@ def __init__( self, sequence=None, name="Progress", - max_value: int = 100, + max_value: int = None, states=("value", "bar", "percent", "time"), custom_state_func=None, custom_state_name=None, @@ -757,10 +763,15 @@ def __init__( def name(self): return self._name + @property + def max_value(self): + return self._max_value or self._current_value + 1 + def set_name(self, value: str): if not isinstance(value, str): raise TypeError("Please send string.") self._name = value + self._console.set_prefix(f"{self._name}") def _create_progress_service(self): return threading.Thread( @@ -768,7 +779,7 @@ def _create_progress_service(self): ) def __repr__(self): - state = self._states_view(self._max_value) + state = self._states_view(self.max_value) progress_example_view = f"{state}" state_conf = f"{self.__class__.__name__}{self._parse_states()}" return f"{state_conf}\n{self._console.parse(progress_example_view, title='Example States View')}" @@ -821,7 +832,7 @@ def _states_view(self, for_value: Number) -> str: self._n_times += 1 kwargs = { "current_value": for_value, - "max_value": self._max_value, + "max_value": self.max_value, "current_percent": self.percent_(for_value), "time_it": self.time_it, "n_times": self._n_times, @@ -889,7 +900,7 @@ def states(self): return self._parse_states() def percent_(self, for_value: Number) -> Number: - return percent(for_value, self._max_value) + return percent(for_value, self.max_value) def update_max_value(self, max_value: int): """ @@ -900,7 +911,7 @@ def update_max_value(self, max_value: int): """ if not isinstance(max_value, (int, float, complex)): raise Exception(f"Current value {max_value} isn't valid.") - if max_value != self._max_value: + if max_value != self.max_value: self._max_value = max_value def _progress_service(self): @@ -917,14 +928,14 @@ def _progress_service(self): ) ) time.sleep(0.5) - if not self._awaiting_update or self._show: + if not self._awaiting_update or (self._show and self._max_value is not None): self._show_progress(self._current_value) last_value = self._current_value time.sleep(0.01) def _show_progress(self, for_value=None): self._awaiting_update = False - build_progress = self._states_view(for_value) + build_progress = self._states_view(for_value or self._current_value) self._console.replace_last_msg( build_progress, end="\n" if self._iter_finaly else None ) @@ -986,16 +997,18 @@ def prog( cls, sequence: Sequence[Any], name: str = None, + max_value: int = None, states=("value", "bar", "percent", "time"), custom_state_func=None, custom_state_name=None, ) -> "Progress": return cls( name=name, + max_value=max_value, states=states, custom_state_func=custom_state_func, custom_state_name=custom_state_name, - )(sequence) + )(sequence, name, max_value) def __len__(self): return len(self._states) @@ -1013,11 +1026,13 @@ def __getitem__(self, slice_): raise KeyError(f"Not exists {key}") return self._states[slice_] - def __call__(self, sequence: Sequence, name=None) -> "Progress": + def __call__(self, sequence: Sequence, name=None, max_value=None) -> "Progress": if not is_iterable(sequence): raise ValueError("Send a sequence.") if has_length(sequence): self.update_max_value(len(sequence)) + elif max_value is not None: + self.update_max_value(max_value) else: self._is_generator = True self.sequence = sequence @@ -1030,6 +1045,7 @@ def __call__(self, sequence: Sequence, name=None) -> "Progress": return self def __next__(self): + original_name = self._name if not self._with_context: self.start() try: @@ -1041,13 +1057,13 @@ def __next__(self): finally: if self._is_generator: self.update_max_value(self._current_value) - self._was_done = (self._current_value >= self._max_value) and not self._err + self._was_done = (self._current_value >= self.max_value) and not self._err if not self._with_context: self.stop() self._iter_finaly = True self._show_progress(self._current_value) - self._console.set_prefix(self.name) + self._console.set_prefix(original_name) self.sequence = () def __iter__(self): diff --git a/cereja/geolinear/__init__.py b/cereja/geolinear/__init__.py index f3b38a7..9591224 100644 --- a/cereja/geolinear/__init__.py +++ b/cereja/geolinear/__init__.py @@ -1,2 +1,2 @@ from .point import Point -from .utils import Rotation +from .utils import Rotation, find_best_locations diff --git a/cereja/geolinear/utils.py b/cereja/geolinear/utils.py index 7c37eb3..0033e12 100644 --- a/cereja/geolinear/utils.py +++ b/cereja/geolinear/utils.py @@ -1,6 +1,6 @@ import math -__all__ = ["Rotation"] +__all__ = ["Rotation", "find_best_locations"] from typing import Union @@ -74,3 +74,61 @@ def rotate(self, val, axis=None): return reshape(list(map(lambda point: self.rotate_point(point, axis=axis), flatten(val, depth=len(shape) - 2))), shape) + + +def find_best_locations(locations, min_distance=1): + """ + Finds the best locations that respect a minimum distance between points. + + Args: + locations (list of tuple): A list of (x, y) tuples representing the locations. + min_distance (int, optional): The minimum distance in pixels between each point. Default is 1. + + Returns: + list of tuple: A list of (x, y) tuples representing the best locations. + """ + + min_distance_squared = min_distance ** 2 + selected_locations = [] + + for current_location in locations: + is_valid = True + for existing_location in selected_locations: + distance_squared = ( + (current_location[0] - existing_location[0]) ** 2 + + (current_location[1] - existing_location[1]) ** 2 + ) + if distance_squared < min_distance_squared: + is_valid = False + break + if is_valid: + selected_locations.append(current_location) + + return selected_locations + + +def group_locations_into_rows(locations, max_distance=1): + """ + Groups locations into rows based on their y-coordinates. + + Args: + locations (list of tuple): A list of (x, y) tuples representing the locations. + max_distance (int, optional): The maximum distance in pixels between each point. Default is 1. + + Returns: + list of list of tuple: A list of lists of (x, y) tuples representing the grouped locations. + """ + + rows = [] + + for location in locations: + is_added = False + for row in rows: + if all(abs(location[1] - row_location[1]) <= max_distance for row_location in row): + row.append(location) + is_added = True + break + if not is_added: + rows.append([location]) + + return rows diff --git a/cereja/utils/__init__.py b/cereja/utils/__init__.py index b4dff38..def9aa2 100644 --- a/cereja/utils/__init__.py +++ b/cereja/utils/__init__.py @@ -24,3 +24,4 @@ from . import colab # Aliases here from ._utils import get_batch_strides as stride_values +from . import colors diff --git a/cereja/utils/_utils.py b/cereja/utils/_utils.py index efc97d2..44b734a 100644 --- a/cereja/utils/_utils.py +++ b/cereja/utils/_utils.py @@ -92,7 +92,9 @@ "combinations", "combinations_sizes", "value_from_memory", - "str_gen" + "str_gen", + "set_interval", + "SourceCodeAnalyzer" ] logger = logging.getLogger(__name__) @@ -119,33 +121,49 @@ def is_indexable(v): return hasattr(v, "__getitem__") -def split_sequence(seq, is_break_fn): +def split_sequence(seq: List[Any], is_break_fn: Callable) -> List[List[Any]]: """ - Divide a sequence into sub-sequences at the point defined by a given is_break_fn function. + Split a sequence into subsequences based on a break function. + @param seq: sequence to split + @param is_break_fn: function that returns True if the sequence should be split at the current index + e.g: + >>> import cereja as cj + >>> seq = [1, 2, 3, 4, 5, 6, 7, 8, 9] + >>> cj.split_sequence(seq, lambda current_val: current_val % 3 == 0) + [[1, 2, 3], [4, 5, 6], [7, 8, 9]] + >>> cj.split_sequence(seq, lambda current_val, next_val: current_val % 2 == 0 and next_val % 3 == 0) + [[1, 2], [3, 4, 5, 6, 7, 8], [9]] + >>> cj.split_sequence(seq, lambda: True) + [[1], [2], [3], [4], [5], [6], [7], [8], [9]] + >>> cj.split_sequence(seq, lambda current_val, next_val, index: seq[index-1] % 2 == 0 and current_val % 3 == 0) + [[1, 2, 3], [4, 5, 6, 7, 8, 9]] + @return: list of subsequences + """ + if not isinstance(seq, list) or not seq: + raise ValueError("A sequência deve ser uma lista não vazia.") - Args: - seq (list): The sequence to be divided into sub-sequences. - is_break_fn (callable): A function that takes two elements from the sequence and - returns True if there is a break between them and False otherwise. + if not callable(is_break_fn): + raise TypeError("is_break_fn deve ser uma função.") - Returns: - list: A list of sub-sequences, each containing a sequence of consecutive elements. - - Examples: - >>> seq = [1, 2, 3, 4, 5, 2, 3, 4, 5, 6, 7, 8] - >>> is_even = lambda x, y: x % 2 == 0 and y % 2 == 0 - >>> sub_seqs = split_sequence(seq, is_even) - >>> sub_seqs - [[1, 2, 3, 4, 5], [2, 3, 4, 5, 6, 7, 8]] - """ + # Inicializa com a primeira subsequência sub_seqs = [] - sub_seq = [seq[0]] - for i in range(1, len(seq)): - if is_break_fn(seq[i - 1], seq[i]): - sub_seqs.append(sub_seq) - sub_seq = [] - sub_seq.append(seq[i]) - sub_seqs.append(sub_seq) + start_idx = 0 + break_fn_arg_count = SourceCodeAnalyzer(is_break_fn).argument_count + for indx, val in enumerate(seq): + _args = None + if indx + 1 == len(seq): + sub_seqs.append(seq[start_idx:]) + break + if break_fn_arg_count == 1: + _args = (val,) + elif break_fn_arg_count == 2: + _args = (val, seq[indx + 1]) + elif break_fn_arg_count == 3: + _args = (val, seq[indx + 1], indx) + + if is_break_fn(*_args) if _args else is_break_fn(): + sub_seqs.append(seq[start_idx:indx+1]) + start_idx = indx+1 return sub_seqs @@ -258,6 +276,7 @@ def truncate(data: Union[Sequence], k_iter: int = 0, k_str: int = 0, k_dict_keys assert all(isinstance(k, int) and k >= 0 for k in (k_iter, k_str, k_dict_keys)), 'k parameters should be an integer equal to or larger than 0' + data = copy(data) if isinstance(data, dict): if k_dict_keys: data = {key: data.get(key, '<…>') for key in truncate(list(data.keys()), k_dict_keys)} @@ -1169,34 +1188,84 @@ def rescale_values( return result -class Source: +class SourceCodeAnalyzer: def __init__(self, reference: Any): - self._name = None - self._doc = inspect.getdoc(reference) - self._source_code = inspect.getsource(reference) - if hasattr(reference, "__name__"): - self._name = reference.__name__ + self._reference = reference + self._name: Optional[str] = None + self._doc: Optional[str] = None + self._source_code: Optional[str] = None @property - def source_code(self): - return self._source_code.lstrip() + def source_code(self) -> str: + if self._source_code is None: + self._source_code = inspect.getsource(self._reference).lstrip() + return self._source_code @property - def name(self): + def name(self) -> str: + if self._name is None: + if hasattr(self._reference, "__name__"): + self._name = self._reference.__name__ + else: + self._name = self._reference.__class__.__name__ return self._name @property - def doc(self): + def has_arguments(self) -> bool: + return bool(inspect.signature(self._reference).parameters) + + @property + def arguments(self): + return inspect.signature(self._reference).parameters + + @property + def argument_names(self) -> list: + return list(self.arguments.keys()) + + @property + def argument_defaults(self) -> dict: + return {k: v.default for k, v in self.arguments.items() if v.default != inspect.Parameter.empty} + + @property + def argument_count(self) -> int: + return len(self.arguments) + + @property + def required_arguments(self) -> list: + return [k for k, v in self.arguments.items() if v.default == inspect.Parameter.empty] + + @property + def optional_arguments(self) -> list: + return [k for k, v in self.arguments.items() if v.default != inspect.Parameter.empty] + + @property + def has_kwargs(self) -> bool: + return any(v.kind == inspect.Parameter.VAR_KEYWORD for v in self.arguments.values()) + + @property + def has_varargs(self) -> bool: + return any(v.kind == inspect.Parameter.VAR_POSITIONAL for v in self.arguments.values()) + + @property + def docstring(self) -> Optional[str]: + if self._doc is None: + self._doc = inspect.getdoc(self._reference) return self._doc - def save(self, path_, **kwargs): + def save_source_code(self, path: str, **kwargs): from cereja import FileIO, Path - path_ = Path(path_) - if path_.is_dir: - path_ = path_.join(f"{self.name}.py") - assert path_.suffix == ".py", "Only python source code." - FileIO.create(path_, self._source_code).save(**kwargs) + path = Path(path) + if path.is_dir: + path = path.join(f"{self.name}.py") + assert path.suffix == ".py", "Only Python source code is allowed." + FileIO.create(path, self.source_code).save(**kwargs) + + +@depreciation(alternative="SourceCodeAnalyzer") +class Source(SourceCodeAnalyzer): + def __init__(self, reference: Any): + super().__init__(reference) def is_iterable(obj: Any) -> bool: @@ -1486,3 +1555,13 @@ def prune_values(values: Sequence, factor=2): def str_gen(pattern: AnyStr) -> Sequence[AnyStr]: regex = re.compile(pattern) return regex.findall(string.printable) + + +def set_interval(func: Callable, sec: float): + """ + Call a function every sec seconds + @param func: function + @param sec: seconds + """ + from .decorators import on_elapsed + on_elapsed(sec, loop=True, use_threading=True)(func)() diff --git a/cereja/utils/colors/__init__.py b/cereja/utils/colors/__init__.py new file mode 100644 index 0000000..1e5e450 --- /dev/null +++ b/cereja/utils/colors/__init__.py @@ -0,0 +1,2 @@ +from .converters import * +from ._color import Color diff --git a/cereja/utils/colors/_color.py b/cereja/utils/colors/_color.py new file mode 100644 index 0000000..4a54de4 --- /dev/null +++ b/cereja/utils/colors/_color.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from typing import Any + +from . import converters as _converters + +__all__ = ['Color'] + + +class Color: + def __init__(self, red, green, blue, alpha=None): + self.red = red + self.green = green + self.blue = blue + self.alpha = alpha if alpha is not None else 255 + self._is_rgba = alpha is not None + + @property + def is_rgba(self): + return self._is_rgba + + @property + def rgba(self): + return self.red, self.green, self.blue, self.alpha + + @property + def rgb(self): + return self.red, self.green, self.blue + + @property + def hex(self): + return _converters.rgb_to_hex(self.red, self.green, self.blue) + + @property + def hsl(self): + return _converters.rgb_to_hsl(self.red, self.green, self.blue) + + @property + def hsv(self): + return _converters.rgb_to_hsv(self.red, self.green, self.blue) + + @property + def cmyk(self): + return _converters.rgb_to_cmyk(self.red, self.green, self.blue) + + @classmethod + def from_hex(cls, hex_value): + val_parsed = _converters.parse_hex(hex_value) + if len(val_parsed) == 3: + r, g, b = val_parsed + return cls(r, g, b) + return cls(*val_parsed) + + @classmethod + def parse(cls, color: Any): + if isinstance(color, Color): + return color + if isinstance(color, str): + return cls.from_hex(color) + if isinstance(color, tuple): + if len(color) == 3: + return cls(*color) + if len(color) == 4: + return cls(*color[:3], alpha=color[3]) + raise ValueError("Invalid color") + + def interpolate(self, other, factor): + other = self.parse(other) + r = self.red + factor * (other.red - self.red) + g = self.green + factor * (other.green - self.green) + b = self.blue + factor * (other.blue - self.blue) + if self.is_rgba and other.is_rgba: + a = self.alpha + factor * (other.alpha - self.alpha) + else: + a = None + return Color(r, g, b, a) + + def generate_gradient(self, other, steps): + """ + Generate a gradient between two colors. + @param other: color to interpolate with + @param steps: number of steps in the gradient + @return: list of colors + """ + return [self.interpolate(other, i / steps) for i in range(steps)] + + def __eq__(self, other): + return self.rgb == self.parse(other) + + def __ne__(self, other): + return not self.__eq__(self.parse(other)) + + def __hash__(self): + return hash(self.rgba) + + def __getitem__(self, item): + if item == 0: + return self.red + if item == 1: + return self.green + if item == 2: + return self.blue + if item == 3: + return self.alpha + raise IndexError("Invalid index") + + def __setitem__(self, key, value): + if key == 0: + self.red = value + elif key == 1: + self.green = value + elif key == 2: + self.blue = value + elif key == 3: + self.alpha = value + self._is_rgba = True + else: + raise IndexError("Invalid index") + + def __iter__(self): + return iter(self.rgba) if self.is_rgba else iter(self.rgb) + + def __len__(self): + return 4 if self.is_rgba else 3 + + def __repr__(self): + if self.is_rgba: + return f"Color(R={self.red}, G={self.green}, B={self.blue}, A={self.alpha})" + else: + return f"Color(R={self.red}, G={self.green}, B={self.blue})" diff --git a/cereja/utils/colors/converters.py b/cereja/utils/colors/converters.py new file mode 100644 index 0000000..8ed2226 --- /dev/null +++ b/cereja/utils/colors/converters.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +__all__ = ["rgb_to_hex", "hex_to_rgb", "rgb_to_hsl", "hsl_to_rgb", "rgb_to_hsv", "hsv_to_rgb", "rgb_to_cmyk", + "cmyk_to_rgb"] + + +def parse_hex(hex_value): + hex_value = hex_value.lstrip("#") + length = len(hex_value) + if length == 3: + r = int(hex_value[0] * 2, 16) + g = int(hex_value[1] * 2, 16) + b = int(hex_value[2] * 2, 16) + elif length == 4: + # RGBA + r = int(hex_value[0] * 2, 16) + g = int(hex_value[1] * 2, 16) + b = int(hex_value[2] * 2, 16) + a = int(hex_value[3] * 2, 16) + return r, g, b, a + elif length == 6: + r = int(hex_value[:2], 16) + g = int(hex_value[2:4], 16) + b = int(hex_value[4:], 16) + elif length == 8: + # RGBA + r = int(hex_value[:2], 16) + g = int(hex_value[2:4], 16) + b = int(hex_value[4:6], 16) + a = int(hex_value[6:], 16) + return r, g, b, a + else: + raise ValueError(f"Invalid hex color: {hex_value}") + return r, g, b + + +def rgb_to_hex(r, g, b): + return "#{:02x}{:02x}{:02x}".format(r, g, b) + + +def rgba_to_hex(r, g, b, a): + return "#{:02x}{:02x}{:02x}{:02x}".format(r, g, b, a) + + +def hex_to_rgb(hex_value): + value = parse_hex(hex_value) + return value[:3] + + +def hex_to_rgba(hex_value): + value = parse_hex(hex_value) + if len(value) == 3: + return value + (255,) + return value + + +def rgb_to_hsl(r, g, b): + r /= 255.0 + g /= 255.0 + b /= 255.0 + max_c = max(r, g, b) + min_c = min(r, g, b) + l = (max_c + min_c) / 2.0 + if max_c == min_c: + s = h = 0.0 + else: + delta = max_c - min_c + s = delta / (2.0 - max_c - min_c) if l > 0.5 else delta / (max_c + min_c) + if max_c == r: + h = (g - b) / delta + (g < b) * 6.0 + elif max_c == g: + h = (b - r) / delta + 2.0 + elif max_c == b: + h = (r - g) / delta + 4.0 + h /= 6.0 + return h * 360.0, s * 100.0, l * 100.0 + + +def hsl_to_rgb(h, s, l): + s /= 100.0 + l /= 100.0 + c = (1.0 - abs(2 * l - 1.0)) * s + h /= 60.0 + x = c * (1.0 - abs(h % 2 - 1.0)) + if 0 <= h < 1: + r, g, b = c, x, 0 + elif 1 <= h < 2: + r, g, b = x, c, 0 + elif 2 <= h < 3: + r, g, b = 0, c, x + elif 3 <= h < 4: + r, g, b = 0, x, c + elif 4 <= h < 5: + r, g, b = x, 0, c + elif 5 <= h < 6: + r, g, b = c, 0, x + m = l - c / 2.0 + return int((r + m) * 255), int((g + m) * 255), int((b + m) * 255) + + +def hsl_to_rgba(h, s, l): + r, g, b = hsl_to_rgb(h, s, l) + return r, g, b, 255 + + +def rgb_to_hsv(r, g, b): + r, g, b = r / 255.0, g / 255.0, b / 255.0 + max_c = max(r, g, b) + min_c = min(r, g, b) + delta = max_c - min_c + + if delta == 0: + h = 0 + elif max_c == r: + h = (g - b) / delta % 6 + elif max_c == g: + h = (b - r) / delta + 2 + elif max_c == b: + h = (r - g) / delta + 4 + h = int(h * 60) + if h < 0: + h += 360 + + s = 0 if max_c == 0 else delta / max_c + v = max_c + + return h, int(s * 100), int(v * 100) + + +def hsv_to_rgb(h, s, v): + s /= 100 + v /= 100 + c = v * s + x = c * (1 - abs((h / 60) % 2 - 1)) + m = v - c + + if 0 <= h < 60: + r, g, b = c, x, 0 + elif 60 <= h < 120: + r, g, b = x, c, 0 + elif 120 <= h < 180: + r, g, b = 0, c, x + elif 180 <= h < 240: + r, g, b = 0, x, c + elif 240 <= h < 300: + r, g, b = x, 0, c + else: + r, g, b = c, 0, x + + return int((r + m) * 255), int((g + m) * 255), int((b + m) * 255) + + +def hsv_to_rgba(h, s, v): + r, g, b = hsv_to_rgb(h, s, v) + return r, g, b, 255 + + +def rgb_to_cmyk(r, g, b): + if (r == 0) and (g == 0) and (b == 0): + return 0, 0, 0, 100 + c = 1 - r / 255.0 + m = 1 - g / 255.0 + y = 1 - b / 255.0 + k = min(c, m, y) + c = (c - k) / (1 - k) + m = (m - k) / (1 - k) + y = (y - k) / (1 - k) + return int(c * 100), int(m * 100), int(y * 100), int(k * 100) + + +def cmyk_to_rgb(c, m, y, k): + r = 255 * (1 - c / 100) * (1 - k / 100) + g = 255 * (1 - m / 100) * (1 - k / 100) + b = 255 * (1 - y / 100) * (1 - k / 100) + return int(r), int(g), int(b) + + +def cmyk_to_rgba(c, m, y, k): + r, g, b = cmyk_to_rgb(c, m, y, k) + return r, g, b, 255 diff --git a/cereja/utils/decorators.py b/cereja/utils/decorators.py index d2a9610..bd29f0d 100644 --- a/cereja/utils/decorators.py +++ b/cereja/utils/decorators.py @@ -37,6 +37,7 @@ "singleton", "on_except", "use_thread", + "on_elapsed", ] from ..config.cj_types import PEP440 @@ -58,13 +59,12 @@ def synced_func(*args, **kws): def use_thread(func): - from .. import Thread - - def wrapper(*args, **kwargs): - th = Thread(target=func, args=args, kwargs=kwargs, daemon=True) + def threaded_func(*args, **kws): + th = threading.Thread(target=func, args=args, kwargs=kws) th.start() + return th - return wrapper + return threaded_func class _ThreadSafeIterator: @@ -201,3 +201,62 @@ def instance(*args, **kwargs): return instances[cls] return instance + + +def on_elapsed(interval: float = 1, + loop: bool = False, + use_threading: bool = False, + verbose: bool = False, + is_daemon: bool = False): + """ + Run a function if the interval has elapsed + + @param interval: Interval in seconds + @param loop: If True, the function will be executed in a loop + @param verbose: If True, the function name will be printed + @param use_threading: If True, the function will be executed in a thread + @param is_daemon: If True, the thread will be a daemon + """ + + def decorator(func: Callable): + last_time = 0.0 + + def wrapper(*args, **kwargs): + nonlocal last_time + if loop: + def run(): + nonlocal last_time + while True: + current_time = time.time() + if current_time - last_time >= interval: + if verbose: + print(f"Running {func.__name__}") + last_time = current_time + func(*args, **kwargs) + + if use_threading: + import threading + th = threading.Thread(target=run, daemon=is_daemon) + th.start() + else: + run() + else: + def run(): + nonlocal last_time + current_time = time.time() + if current_time - last_time >= interval: + if verbose: + print(f"Running {func.__name__}") + last_time = current_time + return func(*args, **kwargs) + + if use_threading: + import threading + th = threading.Thread(target=run, daemon=is_daemon) + th.start() + else: + return run() + + return wrapper + + return decorator diff --git a/tests/testscolors.py b/tests/testscolors.py new file mode 100644 index 0000000..ed4f5c8 --- /dev/null +++ b/tests/testscolors.py @@ -0,0 +1,82 @@ +import unittest +from cereja.utils.colors import rgb_to_hsl, hsl_to_rgb, rgb_to_hsv, hsv_to_rgb, rgb_to_cmyk, cmyk_to_rgb, Color + + +class TestColor(unittest.TestCase): + def test_to_hex(self): + color = Color(255, 0, 0) + self.assertEqual(color.hex, "#ff0000") + + def test_from_hex(self): + color = Color.from_hex("#ff0000") + self.assertEqual((color.red, color.green, color.blue), (255, 0, 0)) + + def test_to_hsl(self): + color = Color(255, 0, 0) + self.assertEqual(color.hsl, (0.0, 100.0, 50.0)) + + def test_to_hsv(self): + color = Color(255, 0, 0) + self.assertEqual(color.hsv, (0, 100, 100)) + + def test_to_cmyk(self): + color = Color(255, 0, 0) + self.assertEqual(color.cmyk, (0, 100, 100, 0)) + + def test_parse(self): + color = Color.parse("#ff0000") + self.assertEqual((color.red, color.green, color.blue), (255, 0, 0)) + color = Color.parse((255, 0, 0)) + self.assertEqual((color.red, color.green, color.blue), (255, 0, 0)) + color = Color.parse(Color(255, 0, 0)) + self.assertEqual((color.red, color.green, color.blue), (255, 0, 0)) + + def test_is_rgba(self): + color = Color(255, 0, 0) + self.assertFalse(color.is_rgba) + color = Color(255, 0, 0, 255) + self.assertTrue(color.is_rgba) + self.assertEqual(color.rgba, (255, 0, 0, 255)) + self.assertEqual(color.rgb, (255, 0, 0)) + + def test_to_rgba(self): + color = Color(255, 0, 0, 255) + self.assertEqual(color.rgba, (255, 0, 0, 255)) + self.assertEqual(color.rgb, (255, 0, 0)) + self.assertEqual(color.hex, "#ff0000") + self.assertEqual(color.hsl, (0.0, 100.0, 50.0)) + self.assertEqual(color.hsv, (0, 100, 100)) + self.assertEqual(color.cmyk, (0, 100, 100, 0)) + + +class TestConverters(unittest.TestCase): + + def test_rgb_to_hsl(self): + h, s, l = rgb_to_hsl(255, 0, 0) + self.assertAlmostEqual(h, 0.0) + self.assertAlmostEqual(s, 100.0) + self.assertAlmostEqual(l, 50.0) + + def test_hsl_to_rgb(self): + r, g, b = hsl_to_rgb(0.0, 100.0, 50.0) + self.assertEqual((r, g, b), (255, 0, 0)) + + def test_rgb_to_hsv(self): + h, s, v = rgb_to_hsv(255, 0, 0) + self.assertEqual((h, s, v), (0, 100, 100)) + + def test_hsv_to_rgb(self): + r, g, b = hsv_to_rgb(0, 100, 100) + self.assertEqual((r, g, b), (255, 0, 0)) + + def test_rgb_to_cmyk(self): + c, m, y, k = rgb_to_cmyk(255, 0, 0) + self.assertEqual((c, m, y, k), (0, 100, 100, 0)) + + def test_cmyk_to_rgb(self): + r, g, b = cmyk_to_rgb(0, 100, 100, 0) + self.assertEqual((r, g, b), (255, 0, 0)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/testsutils.py b/tests/testsutils.py index fbadebf..448e299 100644 --- a/tests/testsutils.py +++ b/tests/testsutils.py @@ -569,5 +569,69 @@ def test_time_exec(self): pass +class TestSplitSequence(unittest.TestCase): + + def test_basic_case(self): + seq = [1, 2, 3, 4, 5, 2, 2, 3, 4, 5, 6, 7, 8] + is_even = lambda x, y: x % 2 == 0 and y % 2 == 0 + result = utils.split_sequence(seq, is_even) + expected = [[1, 2, 3, 4, 5, 2], [2, 3, 4, 5, 6, 7, 8]] + self.assertEqual(result, expected) + + def test_no_breaks(self): + seq = [1, 2, 3, 4, 5] + is_never_break = lambda x, y: False + result = utils.split_sequence(seq, is_never_break) + expected = [seq] # Should return the entire sequence as a single list + self.assertEqual(result, expected) + + def test_all_breaks(self): + seq = [1, 2, 3, 4, 5] + is_always_break = lambda x, y: True + result = utils.split_sequence(seq, is_always_break) + expected = [[1], [2], [3], [4], [5]] # Each element should be its own subsequence + self.assertEqual(result, expected) + + def test_single_element(self): + seq = [42] + is_even = lambda x, y: x % 2 == 0 and y % 2 == 0 + result = utils.split_sequence(seq, is_even) + expected = [[42]] # A single element should return as a single subsequence + self.assertEqual(result, expected) + + def test_empty_sequence(self): + with self.assertRaises(ValueError): + utils.split_sequence([], lambda x, y: x < y) + + def test_non_callable_is_break_fn(self): + with self.assertRaises(TypeError): + utils.split_sequence([1, 2, 3], "not_a_function") + + def test_non_numeric_sequence(self): + seq = ["apple", "banana", "cherry", "date", "elderberry"] + # A quebra ocorre quando o segundo elemento (y) começa com uma vogal, + # e o primeiro elemento (x) não começa com uma vogal. + is_vowel_break = lambda x, y: y[0] in "aeiou" and x[0] not in "aeiou" + result = utils.split_sequence(seq, is_vowel_break) + expected = [["apple", "banana", "cherry", "date"], ["elderberry"]] + self.assertEqual(result, expected) + + def test_subsequences_with_custom_objects(self): + class Point: + def __init__(self, x, y): + self.x = x + self.y = y + + def __eq__(self, other): + return self.x == other.x and self.y == other.y + + seq = [Point(1, 2), Point(1, 3), Point(4, 4), Point(4, 8)] + is_same_x = lambda p1, p2: p1.x == p2.x + result = utils.split_sequence(seq, is_same_x) + # The expected result should be split based on x coordinate equality + expected = [[seq[0]], [seq[1], seq[2]], [seq[3]]] + self.assertEqual(result, expected) + + if __name__ == "__main__": unittest.main()