From 6b3a2144eb8ef1f4307a8e0f6da88dc78fca0fdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Mu=C3=B1oz?= Date: Thu, 21 Sep 2023 22:26:44 +0200 Subject: [PATCH] Extract tracing to carpeta Python package --- cartuli/__main__.py | 2 +- cartuli/tracing/__init__.py | 10 -- cartuli/tracing/logging.py | 23 ---- cartuli/tracing/output.py | 113 ------------------ cartuli/tracing/trace.py | 232 ------------------------------------ requirements.txt | 3 +- tests/test_tracing.py | 93 --------------- 7 files changed, 3 insertions(+), 473 deletions(-) delete mode 100644 cartuli/tracing/__init__.py delete mode 100644 cartuli/tracing/logging.py delete mode 100644 cartuli/tracing/output.py delete mode 100644 cartuli/tracing/trace.py delete mode 100644 tests/test_tracing.py diff --git a/cartuli/__main__.py b/cartuli/__main__.py index db0ee6d..fce4c1b 100644 --- a/cartuli/__main__.py +++ b/cartuli/__main__.py @@ -6,10 +6,10 @@ import re import sys +from carpeta import Tracer, ImageHandler, trace_output from pathlib import Path from .definition import Definition -from .tracing import Tracer, ImageHandler, trace_output def parse_args(args: list[str] = None) -> argparse.Namespace: diff --git a/cartuli/tracing/__init__.py b/cartuli/tracing/__init__.py deleted file mode 100644 index b9beba5..0000000 --- a/cartuli/tracing/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Package with trace image processing functionality.""" -from .trace import Tracer -from .output import trace_output -from .logging import ImageHandler - -# TODO: Extract all this to another repository and Python package - -__all__ = [ - Tracer, trace_output, ImageHandler -] diff --git a/cartuli/tracing/logging.py b/cartuli/tracing/logging.py deleted file mode 100644 index 934ad20..0000000 --- a/cartuli/tracing/logging.py +++ /dev/null @@ -1,23 +0,0 @@ -from datetime import datetime -from logging import Handler, LogRecord, NOTSET - -from .trace import Tracer - - -class ImageHandler(Handler): - def __init__(self, tracer: Tracer, level=NOTSET): - self.__tracer = tracer - super().__init__(level) - - def emit(self, record: LogRecord) -> None: - timestamp = datetime.fromtimestamp(record.created) - - if hasattr(record, 'trace'): - self.__tracer.record( - record.trace, - timestamp=timestamp, - message=record.msg, - function_name=record.funcName, - source_file=record.pathname, - line_number=record.lineno, - ) diff --git a/cartuli/tracing/output.py b/cartuli/tracing/output.py deleted file mode 100644 index d269806..0000000 --- a/cartuli/tracing/output.py +++ /dev/null @@ -1,113 +0,0 @@ -import nbformat as nbf -import yaml - -from pathlib import Path - -from .trace import Trace - - -def trace_html_output(traces: list[Trace], output_file: Path): - PRISM_VERSION = "1.29.0" - PRISM_URL = f"https://cdnjs.cloudflare.com/ajax/libs/prism/{PRISM_VERSION}" - - BOOTSTRAP_VERSION = "5.2.3" - BOOTSTRAP_URL = f"https://cdn.jsdelivr.net/npm/bootstrap@{BOOTSTRAP_VERSION}/dist" - - # TODO: Generate html vía template - # TODO: Properly format margins - with output_file.open('w') as output_file: - output_file.write(f""" - - - - - - - - - - """) - output_file.write(f""" - -
""") - for n, trace in enumerate(traces): - output_file.write(f""" -
""") - for record in trace: - # TODO: Add line numbers in code, this approach does not work - if record.message: - output_file.write(f""" -
{record.message}
""") - output_file.write(f""" -
{record.code}
- -
""") - output_file.write(""" -
""") - output_file.write(""" -
""") - output_file.write(""" - -""") - - -def trace_yaml_output(traces: list[Trace], output_file: Path): - with output_file.open('w') as o: - yaml.dump(list(traces), o) - - -def trace_notebook_output(traces: list[Trace], output_dir: Path): - # TODO: Include required imports - # TODO: Load image in the first cell - # TODO: Comment logging lines - # TODO: Propagate image value between cells - # TODO: Reindent code - output_dir.mkdir(parents=True, exist_ok=True) - - for trace in traces: - output_file = output_dir / f"{trace.name}.ipynb" - - nb = nbf.v4.new_notebook() - for record in trace: - code_cell = nbf.v4.new_code_cell() - code_cell.source = record.code - nb['cells'].append(code_cell) - - nbf.write(nb, output_file) - - -def trace_output(traces: list[Trace], output_path: Path | str): - if isinstance(output_path, str): - output_path = Path(output_path) - - output_path = output_path.expanduser() - - if not output_path.is_absolute(): - output_path = Path.cwd() / output_path - - match output_path: - case Path(suffix='.html'): - trace_html_output(traces, output_path) - case Path(suffix='.yml') | Path(suffix='.yaml'): - trace_yaml_output(traces, output_path) - case Path(suffix=''): - trace_notebook_output(traces, output_path) - case _: - raise ValueError('Unable to identify output format in output_path') diff --git a/cartuli/tracing/trace.py b/cartuli/tracing/trace.py deleted file mode 100644 index a0ff1a8..0000000 --- a/cartuli/tracing/trace.py +++ /dev/null @@ -1,232 +0,0 @@ -from __future__ import annotations - -import base64 -import cv2 as cv -import io -import inspect -import numpy as np -import threading - -from datetime import datetime -from pathlib import Path -from PIL import Image -from typing import Iterable - - -class Record: - # TUNE: This could be applied to any type of object - def __init__(self, image: Image.Image | np.ndarray, timestamp: datetime = None, previous: Record = None, /, - message: str = None, function_name: str = None, source_file: str | Path = None, - line_number: int = None, thread_id: int = None): - if isinstance(image, np.ndarray): - image = Image.fromarray(cv.cvtColor(image, cv.COLOR_BGR2RGB)) - self.__image = image - - if timestamp is None: - timestamp = datetime.now() - self.__timestamp = timestamp - - self.__function_name = function_name - self.__source_file = source_file - self.__line_number = line_number - self.__thread_id = thread_id - self.__previous = previous - self.__message = message - - try: - self.__image_file = Path(image.filename) - except AttributeError: - if previous is not None: - self.__image_file = previous.image_file - else: - self.__image_file = None - - self.__data_uri_image = None - self.__code_lines = None - - @property - def image(self) -> Image.Image: - return self.__image - - @property - def data_uri_image(self) -> bytes: - if self.__data_uri_image is None: - image_buffer = io.BytesIO() - self.image.save(image_buffer, format="PNG") - self.__data_uri_image = \ - f"data:image/png;base64,{base64.b64encode(image_buffer.getvalue()).decode('UTF-8')}" - - return self.__data_uri_image - - @property - def image_file(self) -> Path | None: - return self.__image_file - - @property - def previous(self) -> Record | None: - return self.__previous - - @property - def timestamp(self) -> datetime: - return self.__timestamp - - @property - def message(self) -> str | None: - return self.__message - - @property - def function_name(self) -> str | None: - return self.__function_name - - @property - def source_file(self) -> str | None: - return self.__source_file - - @property - def line_number(self) -> int | None: - return self.__line_number - - @property - def thread_id(self) -> int | None: - return self.__thread_id - - @property - def code_lines(self) -> tuple[str]: - if self.__code_lines is None: - # TUNE: The code can change during execution, maybe this should not be lazy - with Path(self.source_file).open('r') as file: - file_code = file.readlines() - - last_line = self.line_number - if self.previous is None or self.function_name != self.previous.function_name: - first_line = last_line - 1 - else: - first_line = self.previous.line_number - trace_code_lines = file_code[first_line:last_line] - - # TODO: Remove all initial empty lines, not only the first - if not trace_code_lines[0].strip(): - del trace_code_lines[0] - - self.__code_lines = tuple(t.rstrip() for t in trace_code_lines) - - return self.__code_lines - - @property - def code(self) -> str: - return '\n'.join(self.code_lines) - - def __getstate__(self) -> dict: - record_dict = { - 'image': self.data_uri_image, - # TODO: Review iso format and timezone management - 'timestamp': self.timestamp.isoformat(), - 'code_lines': self.code_lines - } - - if self.message is not None: - record_dict |= {'image_file': self.message} - if self.image_file is not None: - record_dict |= {'image_file': str(self.image_file)} - if self.function_name is not None: - record_dict |= {'function_name': self.function_name} - if self.source_file is not None: - record_dict |= {'source_file': self.source_file} - if self.line_number is not None: - record_dict |= {'line_number': self.line_number} - if self.thread_id is not None: - record_dict |= {'thread_id': self.thread_id} - - return record_dict - - -class Trace: - def __init__(self, name: str): - self.__name = name - self.__records = [] - - @property - def name(self) -> str: - return self.__name - - @property - def thread_id(self) -> int: - # TUNE: Not sure if this property should live here - return self.__records[0].thread_id - - def add(self, record: Record): - self.__records.append(record) - - def __len__(self): - return len(self.__records) - - def __getitem__(self, key) -> Record: - return self.__records[key] - - def __iter__(self) -> Iterable[Record]: - return (record for record in self.__records) - - def items(self) -> tuple(Record): - return tuple(self) - - def __getstate__(self) -> dict: - return { - 'name': self.name, - 'records': self.__records, - } - - -class Tracer: - def __init__(self,): - self.__last_thread_trace = {} - self.__traces = [] - - def record(self, image: Image.Image | np.ndarray, /, timestamp: datetime = None, - message: str = None, function_name: str = None, source_file: str = None, - line_number: int = None) -> None: - # TUNE: I tried to use frame info but logging does not return it, - # maybe there is a better way - if function_name is None or source_file is None or line_number is None: - calling_frame = inspect.currentframe().f_back - calling_frame_info = inspect.getframeinfo(calling_frame) - function_name = calling_frame_info.function - source_file = calling_frame_info.filename - line_number = calling_frame_info.lineno - - thread_id = threading.get_native_id() - - # TODO: Replace this with any other way to split traces - # An idea is to define a trace generator class or function - # that defined when a new trace must be created and whats its name - if hasattr(image, 'filename'): - try: - trace_name = Path(image.filename).relative_to(Path.cwd()) - except ValueError: - trace_name = Path(image.filename) - trace = Trace(name=trace_name.stem) - self.__traces.append(trace) - self.__last_thread_trace[thread_id] = trace - previous_trace = None - else: - previous_trace = self.__last_thread_trace[thread_id][-1] - - record = Record( - image, - timestamp, - previous_trace, - message=message, - function_name=function_name, - source_file=source_file, - line_number=line_number, - thread_id=thread_id - ) - self.__last_thread_trace[thread_id].add(record) - - def __len__(self): - return len(self.__traces) - - def __getitem__(self, key) -> Trace: - return self.__traces[key] - - def __iter__(self) -> Iterable[Trace]: - return (trace for trace in self.__traces) diff --git a/requirements.txt b/requirements.txt index 4a861d6..ec9e038 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ reportlab==4.* opencv-python==4.* pillow==10.* pyyaml==6.* -nbformat==5.* \ No newline at end of file +nbformat==5.* +carpeta==0.1.0a0 \ No newline at end of file diff --git a/tests/test_tracing.py b/tests/test_tracing.py deleted file mode 100644 index d5549bf..0000000 --- a/tests/test_tracing.py +++ /dev/null @@ -1,93 +0,0 @@ -import logging - -from PIL import Image, ImageOps - -from cartuli.tracing import Tracer, ImageHandler - - -def tracer_process_image(image: Image.Image, tracer: Tracer): - tracer.record(image) - bw_image = image.convert("L") - tracer.record(bw_image) - return bw_image - - -def tracer_reprocess_image(image: Image.Image, tracer: Tracer): - tracer.record(image) - inverted_image = ImageOps.invert(image) - tracer.record(inverted_image) - return inverted_image - - -def logger_process_image(image: Image.Image): - logger = logging.getLogger('logger_process_image') - logger.info("", extra={'trace': image}) - bw_image = image.convert("L") - logger.info("", extra={'trace': bw_image}) - return bw_image - - -def test_tracer(random_image_file): - tracer = Tracer() - - image = Image.open(random_image_file()) - processed_image = tracer_process_image(image, tracer) - - assert tracer[0][0].image == image - assert tracer[0][0].function_name == 'tracer_process_image' - assert tracer[0][0].line_number == 9 - assert tracer[0][1].image == processed_image - assert tracer[0][1].function_name == 'tracer_process_image' - assert tracer[0][1].line_number == 11 - assert tracer[0][0].timestamp < tracer[0][1].timestamp - - assert len(tracer) == 1 - image = Image.open(random_image_file()) - processed_image = tracer_process_image(image, tracer) - tracer_reprocess_image(processed_image, tracer) - assert len(tracer) == 2 - assert len(tracer[0]) == 2 - assert len(tracer[1]) == 4 - assert tracer[0][0].previous is None - assert tracer[0][1].previous == tracer[0][0] - assert tracer[1][0].previous is None - assert tracer[1][1].previous == tracer[1][0] - assert tracer[1][2].previous == tracer[1][1] - assert tracer[1][3].previous == tracer[1][2] - - -# TUNE: This probably could be implemented as parametrization of the previous test -def test_logging(random_image_file): - tracer = Tracer() - handler = ImageHandler(tracer, logging.INFO) - logger = logging.getLogger('logger_process_image') - logger.setLevel(logging.INFO) - logger.addHandler(handler) - - image = Image.open(random_image_file()) - processed_image = logger_process_image(image) - - assert tracer[0][0].image == image - assert tracer[0][0].function_name == 'logger_process_image' - assert tracer[0][0].line_number == 24 - assert tracer[0][1].image == processed_image - assert tracer[0][1].function_name == 'logger_process_image' - assert tracer[0][1].line_number == 26 - assert tracer[0][0].timestamp < tracer[0][1].timestamp - - -def test_trace_image_file(random_image_file): - tracer = Tracer() - - image_file_1 = random_image_file() - image_1 = Image.open(image_file_1) - tracer_process_image(image_1, tracer) - - image_file_2 = random_image_file() - image_2 = Image.open(image_file_2) - tracer_process_image(image_2, tracer) - - assert tracer[0][0].image_file == image_file_1 - assert tracer[0][1].image_file == image_file_1 - assert tracer[1][0].image_file == image_file_2 - assert tracer[1][1].image_file == image_file_2