Skip to content

Commit

Permalink
Merge pull request #52
Browse files Browse the repository at this point in the history
v3.1.0
  • Loading branch information
MatteoCampinoti94 authored Sep 25, 2024
2 parents aa86619 + f8149f7 commit b948dc1
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 178 deletions.
2 changes: 1 addition & 1 deletion acacore/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.0.11"
__version__ = "3.1.0"
2 changes: 2 additions & 0 deletions acacore/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from . import click
from . import decorators
from . import functions
from . import helpers
from . import io
Expand Down
179 changes: 179 additions & 0 deletions acacore/utils/click.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
from datetime import datetime
from logging import ERROR
from logging import INFO
from logging import Logger
from pathlib import Path
from re import compile as re_compile
from re import Pattern
from sqlite3 import DatabaseError
from sys import stdout
from traceback import format_tb
from typing import Callable

from click import BadParameter
from click import Command
from click import Context
from click import Parameter

from acacore.models.history import HistoryEntry
from acacore.utils.helpers import ExceptionManager
from acacore.utils.log import setup_logger


def ctx_params(ctx: Context) -> dict[str, Parameter]:
"""
Get parameters from a click context as a dict.
:param ctx: The ``Context`` object of the command from which to extract parameters.
:return: A dict of all the parameters of the context's command.
"""
return {p.name: p for p in ctx.command.params}


def param_callback_regex(
pattern: str,
flags: int = 0,
) -> Callable[[Context, Parameter, str | tuple[str, ...] | None], str | tuple[str, ...] | None]:
"""
Create a ``click.Parameter`` callback that matches the argument against a given regex pattern.
If the value is None, the value is returned as is. If the value is a tuple (e.g., of the parameter is variadic),
then each item of the tuple is matched. If the value is not None, str, or a tuple, then the value is returned as is.
:param pattern: The pattern to match against.
:param flags: The flags to use for the match.
:return: A ``click.Parameter`` callback function with the signature ``(Context, Parameter, T) -> T``.
"""
compiled_pattern: Pattern = re_compile(pattern, flags)

def callback(ctx: Context, param: Parameter, value: str | tuple[str, ...] | None) -> str | tuple[str, ...] | None:
if value is None:
return value
elif isinstance(value, str) and not compiled_pattern.match(value): # noqa: SIM114
raise BadParameter(f"does not match {pattern!r}", ctx, param)
elif isinstance(value, tuple) and any(not compiled_pattern.match(v) for v in value):
raise BadParameter(f"does not match {pattern!r}", ctx, param)
return value

return callback


def copy_params(command: Command) -> Callable[[Command], Command]:
"""
Copy parameters from one ``Command`` to another.
:param command: The command from which to copy the parameters.
"""

def decorator(command2: Command) -> Command:
command2.params.extend(command.params.copy())
return command2

return decorator


def check_database_version(ctx: Context, param: Parameter, path: Path):
"""
Check if the database at ``path`` is the latest version or not.
:param ctx: The context of the parameter.
:param param: The parameter from which the path value originates.
:param path: The path to the database.
:raises BadParameter: If the database version is not the latest.
"""
if not path.is_file():
return

from acacore.database import FileDB
from acacore.database.upgrade import is_latest

with FileDB(path, check_version=False) as db:
try:
is_latest(db, raise_on_difference=True)
except DatabaseError as err:
raise BadParameter(err.args[0], ctx, param)


def start_program(
ctx: Context,
database: "FileDB", # noqa: F821
version: str,
time: datetime | None = None,
log_file: bool = True,
log_stdout: bool = True,
dry_run: bool = False,
) -> tuple[Logger | None, Logger | None, HistoryEntry]:
"""
Create loggers and ``HistoryEntry`` for the start of a click program.
If ``log_file`` is ``False``, the file logger return value is ``None``. If ``log_stdout`` is ``False``, the
standard output logger return value is ``None``.
If ``dry_run`` is ``True``, the start event is not added to the database.
:param ctx: The context of the command that should be logged.
:param database: The database instance.
:param version: The version of the command/program.
:param time: Optionally, the time to use for the ``HistoryEntry`` event. Defaults to now.
:param log_file: Whether a file log should be opened and returned. Defaults to ``False``.
:param log_stdout: Whether a standard output log should be opened and returned. Defaults to ``False``.
:param dry_run: Whether the command is run in dry-run mode.
:return: A tuple containing the file logger (if set with ``log_file`` otherwise ``None``), the standard output
logger (if set with ``log_stdout`` otherwise ``None``), and the ``HistoryEntry`` start event.
"""
prog: str = ctx.find_root().command.name
log_file: Logger | None = (
setup_logger(f"{prog}_file", files=[database.path.parent / f"{prog}.log"]) if log_file else None
)
log_stdout: Logger | None = setup_logger(f"{prog}_stdout", streams=[stdout]) if log_stdout else None
program_start: HistoryEntry = HistoryEntry.command_history(
ctx,
"start",
data={"version": version},
add_params_to_data=True,
time=time,
)

if not dry_run:
database.history.insert(program_start)

if log_file:
program_start.log(INFO, log_file)
if log_stdout:
program_start.log(INFO, log_stdout, show_args=False)

return log_file, log_stdout, program_start


def end_program(
ctx: Context,
database: "FileDB", # noqa: F821
exception: ExceptionManager,
dry_run: bool = False,
*loggers: Logger | None,
):
"""
Create ``HistoryEntry`` event for the end of a click program.
If ``dry_run`` is ``True``, the end event is not added to the database, and the database changes are not committed.
:param ctx: The context of the command that should be logged.
:param database: The database instance.
:param exception: An ``ExceptionManager`` object that wrapped the command execution.
:param dry_run: Whether the command was run in dry-run mode.
:param loggers: A list of loggers to which to save the end event.
"""
program_end: HistoryEntry = HistoryEntry.command_history(
ctx,
"end",
data=repr(exception.exception) if exception.exception else None,
reason="".join(format_tb(exception.traceback)) if exception.traceback else None,
)

for logger in loggers:
if logger:
program_end.log(ERROR if exception.exception else INFO, logger)

if not dry_run:
database.history.insert(program_end)
database.commit()
16 changes: 16 additions & 0 deletions acacore/utils/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any
from typing import Callable


def docstring_format(**kwargs: Any) -> Callable[[Callable], Callable]: # noqa: ANN401
"""
Format a docstring with keyword arguments.
:param kwargs: The parameter(s) to use to format the docstring.
"""

def decorator(func: Callable) -> Callable:
func.__doc__ = (func.__doc__ or "").format(**kwargs)
return func

return decorator
Loading

0 comments on commit b948dc1

Please sign in to comment.