From 2732948e04f1fada0d34895a628db2b31ccc7dcd Mon Sep 17 00:00:00 2001 From: "Joab Leite S. Neto" Date: Thu, 15 Aug 2024 17:48:13 -0300 Subject: [PATCH] V2.0.0 (#209) * improves: add wait_key * improves: add wait_key * chore: add scraping package * refactor: rename CompanyDetails to Share * refactor: rename CompanyDetails to Share * refactor: add financial info * fix: fixed parser datetime * fix: fixed parser datetime --- cereja/__init__.py | 3 +- cereja/concurrently/process.py | 129 +++++++++++++++++++++- cereja/date/_datetime.py | 3 + cereja/geo/__init__.py | 0 cereja/geo/countries.py | 13 +++ cereja/geolinear/shapes.py | 188 +++++++++++++++++++++++++++++++++ cereja/scraping/__init__.py | 1 + cereja/scraping/_financial.py | 100 ++++++++++++++++++ cereja/scraping/b3.py | 148 ++++++++++++++++++++++++++ cereja/system/_win32.py | 19 +++- cereja/utils/_utils.py | 13 ++- 11 files changed, 608 insertions(+), 9 deletions(-) create mode 100644 cereja/geo/__init__.py create mode 100644 cereja/geo/countries.py create mode 100644 cereja/geolinear/shapes.py create mode 100644 cereja/scraping/__init__.py create mode 100644 cereja/scraping/_financial.py create mode 100644 cereja/scraping/b3.py diff --git a/cereja/__init__.py b/cereja/__init__.py index 832b366..1374cc2 100644 --- a/cereja/__init__.py +++ b/cereja/__init__.py @@ -46,8 +46,9 @@ from .mathtools import * from . import experimental from ._requests import request +from . import scraping -VERSION = "1.9.9.final.0" +VERSION = "2.0.0.final.0" __version__ = get_version_pep440_compliant(VERSION) diff --git a/cereja/concurrently/process.py b/cereja/concurrently/process.py index c1e2cc0..7cb2b75 100644 --- a/cereja/concurrently/process.py +++ b/cereja/concurrently/process.py @@ -3,12 +3,13 @@ import queue import threading import time +from concurrent.futures import ThreadPoolExecutor, as_completed from ..utils import decorators from .. import Progress, console -__all__ = ["MultiProcess"] +__all__ = ["MultiProcess", "Processor"] class _IMultiProcess(abc.ABC): @@ -219,3 +220,129 @@ def __exit__(self, exc_type, exc_val, exc_tb): ) self._q.join() self._with_context = False + + +class Processor: + def __init__(self, num_workers=None, max_in_progress=100, interval_seconds=None, use_progress=True, + on_result=None): + self._num_workers = num_workers if num_workers is not None else 10 + self._on_result = on_result + self._total_success = 0 + self._max_in_progress = max_in_progress + self._interval_seconds = 0 if interval_seconds is None else interval_seconds + self._process_result_service = None + self._future_to_data = set() + self._failure_data = [] + self._stopped = False + self._executor = None + self._started_at = 0 + self._progress = Progress(name="Processor", + max_value=100, + states=("value", "percent", "time"), + custom_state_func=self.get_status, + custom_state_name="Tx") if use_progress else None + + @property + def in_progress_count(self): + return len(self._future_to_data) + + @property + def total_processed(self): + return len(self._failure_data) + self._total_success + + @property + def interval_seconds(self): + return self._interval_seconds + + @property + def total_active_threads(self): + return threading.active_count() + + def _create_process_result_service(self): + if self._process_result_service is not None: + self._process_result_service.join() # Espera terminar se estiver em execução + self._process_result_service = threading.Thread(target=self._process_result, daemon=False) + return self._process_result_service + + def get_failure_data(self): + return self._failure_data + + def _process_result(self): + # Roda enquanto tiver dados aguardando retorno do processo de validação e atualização do banco + while not self.stopped or self.in_progress_count > 0: + # list() é necessário call para criar cópia do objeto que está sendo manipulado em tempo de execução + for future in as_completed(list(self._future_to_data)): + result = future.result() + self._future_to_data.remove(future) + + if self._on_result is not None: + self._on_result(result) + if self._progress is not None: + self._progress.show_progress(self.total_processed) + + def _process(self, func, item, *args, **kwargs): + try: + result = func(item, *args, **kwargs) + self._total_success += 1 + return result + except Exception as exc: + print( + f"Falha ao processa dado, mas será armazenado para conferência.\n" + f"Error: {exc}") + self._failure_data.append(item) + + def get_status(self): + return f"{round(self.total_processed / (time.time() - self._started_at), 2)} cpf/s " \ + f"- processing: {self.in_progress_count} " \ + f"- success: {self._total_success} " \ + f"- fail: {len(self._failure_data)} " + + def process(self, func, data, *args, **kwargs): + """ + Função principal, responsável por controlar o tempo de envio dos dados para processar. + """ + + self._stopped = False + # inicia thread para atualizar o banco com o resultado da validação. + self._create_process_result_service().start() + self._started_at = time.time() + + if self._progress is not None: + self._progress.update_max_value(len(data)) + self._progress.start() + + with ThreadPoolExecutor(max_workers=self._num_workers, + thread_name_prefix="CPF_PROCESS_WORKER") as self._executor: + for item in data: + start_time = time.time() + + future = self._executor.submit(self._process, func, item, *args, **kwargs) + self._future_to_data.add(future) + + elapsed_time = time.time() - start_time + # Verifica quanto tempo passou após enviar um dado, caso o tempo for menor que o intervalo + # configurado espera a diferença antes de enviar o próximo lote + if elapsed_time < self.interval_seconds: + time.sleep(self.interval_seconds - elapsed_time) + if self.in_progress_count >= self._max_in_progress: + print(f"O Total de dados sendo processado {self.in_progress_count} é maior que o predefinido {self._max_in_progress}") + time.sleep(10) + + self.stop_process() + + @property + def stopped(self): + return self._stopped + + def stop_process(self): + self._stopped = True + self._process_result_service.join() + self._progress.stop() + + def restart_process(self): + self.stop_process() # espera terminar execução do processo anterior + self._stopped = False + self._started_at = time.time() + self._failure_data = [] + self._total_success = 0 + self._create_process_result_service().start() \ No newline at end of file diff --git a/cereja/date/_datetime.py b/cereja/date/_datetime.py index a8c6da3..84ec8b3 100644 --- a/cereja/date/_datetime.py +++ b/cereja/date/_datetime.py @@ -35,6 +35,9 @@ class DateTime(datetime): r'\d{2}-\d{2}-\d{4}': '%d-%m-%Y', # Format (DD-MM-YYYY) r'\d{1,2}/\d{1,2}/\d{2,4}': '%m/%d/%Y', # Format (MM/DD/YYYY) r'\d{1,2}-\d{1,2}-\d{2,4}': '%m-%d-%Y', # Format (MM-DD-YYYY) + r'\d{8}': '%Y%m%d', # Format (YYYYMMDD) + r'\d{2}\d{2}\d{4}': '%d%m%Y', # Format (DDMMYYYY) + r'\d{4}\d{2}\d{2}': '%Y%m%d', # Format (YYYYMMDD) without separators # Other formats can be added here } diff --git a/cereja/geo/__init__.py b/cereja/geo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cereja/geo/countries.py b/cereja/geo/countries.py new file mode 100644 index 0000000..f1fe38d --- /dev/null +++ b/cereja/geo/countries.py @@ -0,0 +1,13 @@ +REGIONS = {'Norte': ['Amazonas', 'Roraima', 'Amapá', 'Pará', 'Tocantins', 'Rondônia', 'Acre'], + 'Nordeste': ['Maranhão', 'Piauí', 'Ceará', 'Rio Grande do Norte', 'Pernambuco', 'Paraíba', 'Sergipe', + 'Alagoas', 'Bahia'], + 'Centro-Oeste': ['Mato Grosso', 'Mato Grosso do Sul', 'Goiás', 'Distrito Federal'], + 'Sudeste': ['São Paulo', 'Rio de Janeiro', 'Espírito Santo', 'Minas Gerais'], + 'Sul': ['Paraná', 'Rio Grande do Sul', 'Santa Catarina'] + } +STATES = {'Rondônia': 'RO', 'Acre': 'AC', 'Amazonas': 'AM', 'Roraima': 'RR', 'Pará': 'PA', 'Amapá': 'AP', + 'Tocantins': 'TO', 'Maranhão': 'MA', 'Piauí': 'PI', 'Ceará': 'CE', 'Rio Grande do Norte': 'RN', + 'Paraíba': 'PB', 'Pernambuco': 'PE', 'Alagoas': 'AL', 'Sergipe': 'SE', 'Bahia': 'BA', + 'Minas Gerais': 'MG', 'Espírito Santo': 'ES', 'Rio de Janeiro': 'RJ', 'São Paulo': 'SP', 'Paraná': 'PR', + 'Santa Catarina': 'SC', 'Rio Grande do Sul': 'RS', 'Mato Grosso do Sul': 'MS', 'Mato Grosso': 'MT', + 'Goiás': 'GO', 'Distrito Federal': 'DF'} \ No newline at end of file diff --git a/cereja/geolinear/shapes.py b/cereja/geolinear/shapes.py new file mode 100644 index 0000000..cf3c9d3 --- /dev/null +++ b/cereja/geolinear/shapes.py @@ -0,0 +1,188 @@ +import math + +from .point import Point +from .. import shape_is_ok + + +class Circle: + def __init__(self, center: Point, radius: float): + self.center = center + self.radius = radius + + @property + def area(self) -> float: + """Calculate the area of the circle.""" + return math.pi * self.radius ** 2 + + @property + def circumference(self) -> float: + """Calculate the circumference of the circle.""" + return 2 * math.pi * self.radius + + def contains(self, point: Point) -> bool: + """Check if a given point is inside the circle.""" + return self.center.distance_to(point) <= self.radius + + @property + def diameter(self): + return self.circumference / math.pi + + +class Triangle: + def __init__(self, p1: Point, p2: Point, p3: Point): + self.p1 = p1 + self.p2 = p2 + self.p3 = p3 + + @staticmethod + def side_length(p1: Point, p2: Point) -> float: + """Calculate length of a side between two points.""" + return p1.distance_to(p2) + + @property + def perimeter(self) -> float: + """Calculate the perimeter of the triangle.""" + return self.side_length(self.p1, self.p2) + \ + self.side_length(self.p2, self.p3) + \ + self.side_length(self.p1, self.p3) + + @property + def area(self) -> float: + """Calculate the area of the triangle using Heron's formula.""" + s = self.perimeter / 2 + a = self.side_length(self.p1, self.p2) + b = self.side_length(self.p2, self.p3) + c = self.side_length(self.p1, self.p3) + return math.sqrt(s * (s - a) * (s - b) * (s - c)) + + +class Rectangle: + def __init__(self, point1: Point, point2: Point): + self.point1 = point1 + self.point2 = point2 + + @property + def width(self) -> float: + """Width of the rectangle.""" + return abs(self.point1.x - self.point2.x) + + @property + def height(self) -> float: + """Height of the rectangle.""" + return abs(self.point1.y - self.point2.y) + + @property + def area(self) -> float: + """Calculate the area of the rectangle.""" + return self.width * self.height + + @property + def perimeter(self) -> float: + """Calculate the perimeter of the rectangle.""" + return 2 * (self.width + self.height) + + +class Dimension: + def __init__(self, w, h): + self._width = w + self._height = h + self._ratio = w / h + self._center = w // 2, h // 2 + + @property + def ratio(self): + return self._ratio + + @property + def width(self): + return self._width + + @property + def height(self): + return self._height + + def rect(self, vec, size, keep_ratio=False): + x, y = vec + w = size + h = size + if keep_ratio: + h = h // self.ratio + x2 = x + w + y2 = y + h + + return [(x, y), (x, y2), (x2, y), (x2, y2)] + + @property + def center(self): + return self._center + + @property + def center_x(self): + return self.center[0] + + @property + def center_y(self): + return self.center[1] + + @staticmethod + def fix_rect_pts(pts): + assert shape_is_ok(pts), ValueError(f"Expected a list with 4 points (x, y), received {pts}") + pts = sorted(pts) + x1 = pts[0][0] + x2 = pts[-1][0] + + y1 = pts[0][1] + y2 = pts[-1][1] + return [(x1, y1), (x2, y1), (x2, y2), (x1, y2)] + + @classmethod + def from_rect_points(cls, pts): + points = cls.fix_rect_pts(pts) + w = points[1][0] - points[0][0] + h = points[-1][1] - points[0][1] + return cls(w, h) + + def circle_edges(self, ra): + pass + + def __repr__(self): + return f"Dim(w={self._width}, y={self._height})" + + +if __name__ == "__main__": + BASE_PICTURE_SHEET_DIM = [720, 1280] # WxH + W_SHEET = 653 + H_SHEET = 923 + + # QRCODE + W_QRCODE = 102 + H_QRCODE = 102 + QRCODE_CENTER_X_DIST_TO_WINDOW_CENTER = 241 + QRCODE_CENTER_Y_DIST_TO_WINDOW_CENTER = 396 + + # CELULAR + WINDOW_SIZE = [750, 1334] + WINDOW_CENTER = WINDOW_SIZE[0] // 2, WINDOW_SIZE[1] // 2 + + # pegar proporção + PX = BASE_PICTURE_SHEET_DIM[0] / WINDOW_SIZE[0] + PY = BASE_PICTURE_SHEET_DIM[1] / WINDOW_SIZE[1] + + # Calcula tamanho da folha em relação a tela + W_SHEET_WINDOW, H_SHEET_WINDOW = W_SHEET * PX, H_SHEET * PY + + # Calcula o ponto inicial da folha na tela + WINDOW_SHEET_X1, WINDOW_SHEET_Y1 = ( + WINDOW_CENTER[0] - (W_SHEET_WINDOW // 2), WINDOW_CENTER[1] - H_SHEET_WINDOW // 2) + + # Calcula o ponto final da folha na tela + WINDOW_SHEET_X2, WINDOW_SHEET_Y2 = WINDOW_SHEET_X1 + W_SHEET_WINDOW, WINDOW_SHEET_Y1 + H_SHEET_WINDOW + + # SET LOC QR_CODE + WINDOW_QRCODE_CENTER = WINDOW_CENTER[0] + (QRCODE_CENTER_X_DIST_TO_WINDOW_CENTER * PX), WINDOW_CENTER[1] - ( + QRCODE_CENTER_Y_DIST_TO_WINDOW_CENTER * PY) + W_QRCODE_WINDOW = W_QRCODE * PX + H_QRCODE_WINDOW = H_QRCODE * PY + WINDOW_QRCODE_X1, WINDOW_QRCODE_Y1 = WINDOW_QRCODE_CENTER[0] - (W_QRCODE_WINDOW // 2), WINDOW_QRCODE_CENTER[1] - ( + H_QRCODE_WINDOW // 2) + WINDOW_QRCODE_X2, WINDOW_QRCODE_Y2 = WINDOW_QRCODE_X1 + W_QRCODE_WINDOW, WINDOW_QRCODE_Y1 + H_QRCODE_WINDOW diff --git a/cereja/scraping/__init__.py b/cereja/scraping/__init__.py new file mode 100644 index 0000000..51837c7 --- /dev/null +++ b/cereja/scraping/__init__.py @@ -0,0 +1 @@ +from . import b3 diff --git a/cereja/scraping/_financial.py b/cereja/scraping/_financial.py new file mode 100644 index 0000000..465aaf0 --- /dev/null +++ b/cereja/scraping/_financial.py @@ -0,0 +1,100 @@ +from ..utils import camel_to_snake +try: + from .b3 import Share +except ImportError: + # noinspection PyUnresolvedReferences + Share = None +__all__ = ["FinancialData"] + + +def _parse_keys_to_snake(obj: dict): + if isinstance(obj, dict): + return {camel_to_snake(k): v for k, v in obj.items()} + return obj + + +class FinancialResult: + def __init__(self, describle, value, value2='-'): + self.describle = describle + self.value = value + self.value2 = value2 + + def __repr__(self): + return f"FinancialResult(describle={self.describle}, value={self.value}, value2={self.value2})" + + +class FinancialResultShareholders: + def __init__(self, describle, on, pn, total): + self.describle = describle + self.on = on + self.pn = pn + self.total = total + + def __repr__(self): + return f"FinancialResultShareholders(describle={self.describle}, on={self.on}, pn={self.pn}, total={self.total})" + + +class FinancialStatement: + def __init__(self, title, date_inicial, date_final, results): + self.title = title + self.date_inicial = date_inicial + self.date_final = date_final + self.results = [FinancialResult(**result) for result in results] + + def __repr__(self): + return f"FinancialStatement(title={self.title}, date_inicial={self.date_inicial}, date_final={self.date_final}, results={self.results})" + + +class FreeFloatResult: + def __init__(self, title, describle, quantity, perc, results): + self.title = title + self.describle = describle + self.quantity = quantity + self.perc = perc + self.results = [FinancialResult(**result) for result in results] + + def __repr__(self): + return f"FreeFloatResult(title={self.title}, describle={self.describle}, quantity={self.quantity}, perc={self.perc}, results={self.results})" + + +class ShareholderPosition: + def __init__(self, information_received, name, on, pn, total, results): + self.information_received = information_received + self.name = name + self.on = on + self.pn = pn + self.total = total + self.results = [FinancialResultShareholders(**result) for result in results] + + def __repr__(self): + return f"ShareholderPosition(information_received={self.information_received}, name={self.name}, on={self.on}, pn={self.pn}, total={self.total}, results={self.results})" + + +class CapitalStockComposition: + def __init__(self, title, results): + self.title = title + self.results = [FinancialResult(**result) for result in results] + + def __repr__(self): + return f"CapitalStockComposition(title={self.title}, results={self.results})" + + +class FinancialData: + def __init__(self, share: "Share", title_initial, consolidated, unconsolidated, free_float_result, + position_shareholders, + outstanding_shares, capital_stock_composition): + self._share = share + self.title_initial = title_initial + self.consolidated = [FinancialStatement(**_parse_keys_to_snake(statement)) for statement in consolidated] + self.unconsolidated = [FinancialStatement(**_parse_keys_to_snake(statement)) for statement in unconsolidated] + self.free_float_result = FreeFloatResult(**_parse_keys_to_snake(free_float_result)) + self.position_shareholders = ShareholderPosition(**_parse_keys_to_snake(position_shareholders)) + self.outstanding_shares = _parse_keys_to_snake(outstanding_shares) + self.capital_stock_composition = CapitalStockComposition(**_parse_keys_to_snake(capital_stock_composition)) + + @property + def share(self): + return self._share + + def __repr__(self): + return f"FinancialData({self._share.name})" diff --git a/cereja/scraping/b3.py b/cereja/scraping/b3.py new file mode 100644 index 0000000..6982621 --- /dev/null +++ b/cereja/scraping/b3.py @@ -0,0 +1,148 @@ +from .._requests import request +from ..hashtools import base64_encode +from ..utils import get_zero_mask +from ._financial import FinancialData + +__all__ = ["Share"] +STOCK_EXCHANGES_CONFIG = { + "B3": {"base_api_url": "https://sistemaswebb3-listados.b3.com.br/listedCompaniesProxy/CompanyCall", + "regist_info_endpoint": "GetInitialCompanies", + "head_lines_endpoint": "GetListedHeadLines", + "financial_endpoint": "GetListedFinancial", + } +} + + +class StockExchangeConfig: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(StockExchangeConfig, cls).__new__(cls) + cls._instance._initialize() + return cls._instance + + def _initialize(self): + # Initializes the configurations for the stock exchange APIs + self.config = STOCK_EXCHANGES_CONFIG + + def get_config(self, exchange): + # Returns config for a specific stock exchange + return self.config.get(exchange.upper()) + + +class Share: + def __init__(self, trading_code, exchange="B3", language="pt-br"): + self.trading_code = trading_code.upper() + self.language = language + self.config = StockExchangeConfig().get_config(exchange) + self._head_lines = None + self._financial = None + + if not self.config: + raise ValueError(f"Exchange {exchange} is currently not supported.") + + self._get_share_info() + + def _get(self, url_parsed, timeout=30) -> dict: + response = request.get(url_parsed, timeout=timeout) + if response.code == 200: + return response.json() + raise ConnectionRefusedError(response.data) + + def _get_share_info(self): + + try: + # Fetches and processes the share information from the API + query = {"language": self.language, "pageNumber": 1, "pageSize": 20, "company": self.trading_code} + query_encoded = base64_encode(query).decode() + url = f"{self.config['base_api_url']}/{self.config['regist_info_endpoint']}/{query_encoded}" + response = self._get(url) + results = response.get("results", []) + + if results: + reg_info = results[0] + self.code_cvm = reg_info["codeCVM"] + self.name = reg_info["companyName"] + self.cnpj = get_zero_mask(int(reg_info["cnpj"]), 14) + self.segment = reg_info["segment"] + self.market_indicator = reg_info["marketIndicator"] + self.bdr_type = reg_info["typeBDR"] + self.date_listing = reg_info["dateListing"] + + except Exception as err: + raise Exception(f"Erro ao processar dados de registro. {err}") + + def _get_headlines(self): + + try: + # Fetches and processes the headlines related to the share from the API + query = { + 'agency': self.market_indicator, + 'dateInitial': '2024-05-02', + 'dateFinal': '2024-06-01', + 'issuingCompany': ''.join(char for char in self.trading_code if not char.isnumeric()) + } + query_encoded = base64_encode(query).decode() + url = f"{self.config['base_api_url']}/{self.config['head_lines_endpoint']}/{query_encoded}" + response = self._get(url) + self._head_lines = [] + for headline in response: + self._head_lines.append({ + "headline": headline["headline"], + "date": headline["dateTime"], + "url": headline["url"] + }) + + except Exception as err: + raise Exception(f"Erro ao processar dados de eventos. {err}") + + def _get_financial(self): + try: + # Fetches and processes the headlines related to the share from the API + query = {"codeCVM": self.code_cvm, + "language": "pt-br"} + query_encoded = base64_encode(query).decode() + url = f"{self.config['base_api_url']}/{self.config['financial_endpoint']}/{query_encoded}" + response = self._get(url) + if response: + self._financial = FinancialData( + share=self, + title_initial=response.get("titleInitial", ''), + consolidated=response.get("consolidated", {}), + unconsolidated=response.get("consolidated", {}), + free_float_result=response.get("freeFloatResult", {}), + position_shareholders=response.get("positionShareholders", {}), + outstanding_shares=response.get("outstandingShares", {}), + capital_stock_composition=response.get("capitalStockComposition", {}) + ) + else: + self._financial = {} + except Exception as err: + raise Exception(f"Erro ao processar dados financeiros. {err}") + + @property + def financial(self): + if self._financial is None: + self._get_financial() + return self._financial + + @property + def headlines(self): + if self._head_lines is None: + self._get_headlines() + return self._head_lines + + @property + def exchange(self): + # Returns the stock exchange associated with the share + return self.config.get("exchange") + + @property + def is_bdr(self): + # Checks if the share is a BDR (Brazilian Depositary Receipt) + return bool(self.bdr_type) + + def __repr__(self): + # String representation of the Share instance + return f"{self.trading_code}(cnpj={self.cnpj}, name={self.name})" diff --git a/cereja/system/_win32.py b/cereja/system/_win32.py index 846ba08..c9596b1 100644 --- a/cereja/system/_win32.py +++ b/cereja/system/_win32.py @@ -209,20 +209,22 @@ def max_time_key_press(self, v): def key_map(self): return self.__KEY_NAME_TO_CODE - def _parse_key(self, v): + @classmethod + def _parse_key(cls, v): try: if not isinstance(v, (str, int)): raise ValueError if isinstance(v, str): v = string_to_literal(v) if isinstance(v, str): - return [self.__KEY_NAME_TO_CODE[i] for i in v.split("+")] + return [cls.__KEY_NAME_TO_CODE[i] for i in v.split("+")] except (ValueError, Exception): raise ValueError(f"Value {v} in't valid to keyboard.") return [v] def _is_pressed(self, key_code): - return self.user32.GetAsyncKeyState(key_code) & 0x8000 != 0 + if user32.GetAsyncKeyState(key_code) & 0x8000 != 0: + return self._hwnd is None or self._hwnd == Window.get_foreground_window().hwnd def is_pressed(self, key): """ @@ -277,6 +279,15 @@ def _key_press(self, key_code, n_times=1, secs=None): else: self._press_n_times(key_code[0], n_times=n_times) + def wait_key(self, key, delay=1): + start_time = time.time() + while True: + if time.time() - start_time > delay / 1000.0: + return False + if self.is_pressed(key): + return True + time.sleep(0.01) + def key_press(self, key, n_times=1, secs=None): """ Simulates a key press. @@ -646,7 +657,7 @@ def capture_image_bmp(self, filepath=None, only_window_content=True): # Cria buffer para os dados da imagem bitmap_data = ctypes.create_string_buffer( - abs(bitmap_info.bmiHeader.biWidth * bitmap_info.bmiHeader.biHeight * 4)) + abs(bitmap_info.bmiHeader.biWidth * bitmap_info.bmiHeader.biHeight * 4)) # Obtém os dados da imagem GetDIBits(mem_dc, screenshot, 0, height, bitmap_data, ctypes.byref(bitmap_info), DIB_RGB_COLORS) diff --git a/cereja/utils/_utils.py b/cereja/utils/_utils.py index e942f21..44455c1 100644 --- a/cereja/utils/_utils.py +++ b/cereja/utils/_utils.py @@ -20,6 +20,8 @@ import ctypes import gc import math +import re +import string import time from collections import OrderedDict, defaultdict from importlib import import_module @@ -27,7 +29,7 @@ import sys import types import random -from typing import Any, Union, List, Tuple, Sequence, Iterable, Dict, MappingView, Optional, Callable +from typing import Any, Union, List, Tuple, Sequence, Iterable, Dict, MappingView, Optional, Callable, AnyStr import logging import itertools from copy import copy @@ -89,7 +91,8 @@ "has_length", "combinations", "combinations_sizes", - "value_from_memory" + "value_from_memory", + "str_gen" ] logger = logging.getLogger(__name__) @@ -112,7 +115,6 @@ def _format(self, object, *args): super()._format(object, *args) - def is_indexable(v): return hasattr(v, "__getitem__") @@ -1469,3 +1471,8 @@ def prune_values(values: Sequence, factor=2): if len(res) == 0: return values[k] return res + + +def str_gen(pattern: AnyStr) -> Sequence[AnyStr]: + regex = re.compile(pattern) + return regex.findall(string.printable)