From 874fed79453b5c43a9eb7cbe01bf289999a3e3fc Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 17 Oct 2023 11:23:29 +0200 Subject: [PATCH] refactored my diegfried implementation to be compatible with the one on main: Now only the methods that handled aca identification remain and they have been moved to the file class. * Also added datamodels to check the data we get from reference files --- acacore/database/base.py | 16 +- acacore/database/column.py | 7 +- acacore/database/files_db.py | 9 +- acacore/models/__init__.py | 1 + acacore/models/file.py | 76 ++++++++- acacore/models/file_data.py | 5 +- acacore/models/history.py | 3 +- acacore/models/identification.py | 3 +- acacore/models/reference_files.py | 24 +++ acacore/reference_files/ref_files.py | 25 ++- acacore/siegfried/siegfried.py | 35 ++--- acacore/siegfried_utils/__init__.py | 0 acacore/siegfried_utils/identify.py | 227 --------------------------- acacore/utils/functions.py | 4 +- acacore/utils/log.py | 7 +- 15 files changed, 145 insertions(+), 297 deletions(-) create mode 100644 acacore/models/reference_files.py delete mode 100644 acacore/siegfried_utils/__init__.py delete mode 100644 acacore/siegfried_utils/identify.py diff --git a/acacore/database/base.py b/acacore/database/base.py index d95857a..6bee159 100644 --- a/acacore/database/base.py +++ b/acacore/database/base.py @@ -1,23 +1,13 @@ from datetime import datetime from os import PathLike from pathlib import Path -from sqlite3 import Connection +from sqlite3 import Connection, OperationalError from sqlite3 import Cursor as SQLiteCursor -from sqlite3 import OperationalError -from typing import Any -from typing import Generator -from typing import Generic -from typing import Optional -from typing import Type -from typing import TypeVar -from typing import Union -from typing import overload +from typing import Any, Generator, Generic, Optional, Type, TypeVar, Union, overload from pydantic.main import BaseModel -from .column import Column -from .column import SelectColumn -from .column import model_to_columns +from .column import Column, SelectColumn, model_to_columns T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/database/column.py b/acacore/database/column.py index acc06dd..249c452 100644 --- a/acacore/database/column.py +++ b/acacore/database/column.py @@ -1,11 +1,6 @@ from datetime import datetime from pathlib import Path -from typing import Callable -from typing import Generic -from typing import Optional -from typing import Type -from typing import TypeVar -from typing import Union +from typing import Callable, Generic, Optional, Type, TypeVar, Union from uuid import UUID from pydantic import BaseModel diff --git a/acacore/database/files_db.py b/acacore/database/files_db.py index 8cb9024..d36d21a 100644 --- a/acacore/database/files_db.py +++ b/acacore/database/files_db.py @@ -1,15 +1,12 @@ from datetime import datetime from os import PathLike from sqlite3 import Connection -from typing import Optional -from typing import Type -from typing import Union +from typing import Optional, Type, Union from uuid import UUID from acacore.utils.functions import or_none -from .base import Column -from .base import FileDBBase -from .base import SelectColumn + +from .base import Column, FileDBBase, SelectColumn class FileDB(FileDBBase): diff --git a/acacore/models/__init__.py b/acacore/models/__init__.py index 7b1839e..eea3b30 100644 --- a/acacore/models/__init__.py +++ b/acacore/models/__init__.py @@ -3,3 +3,4 @@ from . import file_data from . import identification from . import metadata +from . import reference_files diff --git a/acacore/models/file.py b/acacore/models/file.py index d1af288..f81a47c 100644 --- a/acacore/models/file.py +++ b/acacore/models/file.py @@ -1,17 +1,19 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +import re from pathlib import Path -from typing import Optional +from typing import Optional, Tuple -from pydantic import Field -from pydantic import UUID4 +from pydantic import UUID4, Field +from acacore.models.reference_files import CustomSignature +from acacore.siegfried.siegfried import Siegfried, SiegfriedFile from acacore.utils.io import size_fmt + from .base import ACABase from .identification import Identification - # ----------------------------------------------------------------------------- # Model # ----------------------------------------------------------------------------- @@ -31,6 +33,50 @@ class File(ACABase): warning: Optional[str] = None action: Optional[str] = None + def identify(self, sf: Siegfried) -> SiegfriedFile: + """Identify the file using `siegfried`. + + Args: + sf (Siegfried): A Siegfried class object + + Returns: + SiegfriedFile: A dataclass object containing the results from the identification + """ + return sf.identify(self.get_absolute_path())[0] + + def re_identify_with_aca(self, costum_sigs: list[CustomSignature]) -> None: + """Uses the BOF and EOF to try to determine a ACAUID for the file. + + The costum_sigs list should be found on the `reference_files` repo. + If no match can be found, the method does nothing. + + Args: + costum_sigs: A list of the costum_signatures that the file should be checked against + """ + bof, eof = self.get_bof_and_eof() + # We have to go through all of the signatures in order to check their BOF en EOF with the file. + for sig in costum_sigs: + if sig.bof and sig.eof: + bof_pattern = re.compile(sig.bof) + eof_pattern = re.compile(sig.eof) + if sig.operator == "OR": + if bof_pattern.search(bof) or eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.operator == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.bof: + bof_pattern = re.compile(sig.bof) + if bof_pattern.search(bof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.eof: + eof_pattern = re.compile(sig.eof) + if eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + def get_absolute_path(self, root: Optional[Path] = None) -> Path: return root.joinpath(self.relative_path) if root else self.relative_path.resolve() @@ -96,6 +142,28 @@ def size_fmt(self) -> str: """ return str(size_fmt(self.get_absolute_path().stat().st_size)) + def get_bof_and_eof(self) -> Tuple[str, str]: + """Get the first and last kilobyte of the file. + + Args: + file (Path): Path to file + + Returns: + Tuple[str,str]: BOF and then EOF as `str`. + """ + file = self.get_absolute_path() + with file.open("rb") as file_bytes: + # BOF + bof = file_bytes.read(1024).hex() + # Navigate to EOF + try: + file_bytes.seek(-1024, 2) + except OSError: + # File too small :) + file_bytes.seek(-file_bytes.tell(), 2) + eof = file_bytes.read(1024).hex() + return (bof, eof) + class ArchiveFile(Identification, File): """ArchiveFile data model.""" diff --git a/acacore/models/file_data.py b/acacore/models/file_data.py index 6f918d8..bfe76c3 100644 --- a/acacore/models/file_data.py +++ b/acacore/models/file_data.py @@ -1,11 +1,10 @@ from pathlib import Path -from typing import Any -from typing import ClassVar -from typing import Optional +from typing import Any, ClassVar, Optional from pydantic import model_validator from acacore.database.files_db import FileDB + from .base import ACABase from .file import ArchiveFile diff --git a/acacore/models/history.py b/acacore/models/history.py index a8e7e80..e6e458d 100644 --- a/acacore/models/history.py +++ b/acacore/models/history.py @@ -1,6 +1,5 @@ from datetime import datetime -from typing import Optional -from typing import Union +from typing import Optional, Union from pydantic import UUID4 diff --git a/acacore/models/identification.py b/acacore/models/identification.py index a5c4e60..9effa0b 100644 --- a/acacore/models/identification.py +++ b/acacore/models/identification.py @@ -1,5 +1,4 @@ -from typing import Any -from typing import Optional +from typing import Any, Optional from pydantic import model_validator diff --git a/acacore/models/reference_files.py b/acacore/models/reference_files.py new file mode 100644 index 0000000..9681653 --- /dev/null +++ b/acacore/models/reference_files.py @@ -0,0 +1,24 @@ +"""Data models for the data on saved to different .json files on the `reference_files` repo.""" +from typing import Optional + +from pydantic import BaseModel + + +class ReIdentifyModel(BaseModel): + """Data model for the `to_reidentify` from reference_files.""" + + puid: Optional[str] = None + name: Optional[str] = None + ext: Optional[str] = None + reasoning: Optional[str] = None + + +class CustomSignature(BaseModel): + """Data model for the `costum_signatures` from reference_files.""" + + bof: Optional[str] = None + eof: Optional[str] = None + operator: Optional[str] = None + puid: Optional[str] = None + signature: Optional[str] = None + extension: Optional[str] = None diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py index 0098d59..6fbcbcc 100644 --- a/acacore/reference_files/ref_files.py +++ b/acacore/reference_files/ref_files.py @@ -3,9 +3,11 @@ from http.client import HTTPResponse from urllib import request +from models.reference_files import CustomSignature, ReIdentifyModel + @lru_cache -def to_re_identify() -> dict[str, str]: +def to_re_identify() -> list[ReIdentifyModel]: """Gets the json file with the different formats that we wish to reidentify. Is kept updated on the reference-files repo. The function caches the result, @@ -22,11 +24,16 @@ def to_re_identify() -> dict[str, str]: if re_identify_map is None: raise ConnectionError - return re_identify_map + result_list: list[ReIdentifyModel] = [] + for key, values in re_identify_map.items(): + result = ReIdentifyModel(puid=key, **values) + result_list.append(result) + + return result_list @lru_cache -def costum_sigs() -> list[dict]: +def costum_sigs() -> list[CustomSignature]: """Gets the json file with our own costum formats in a list. Is kept updated on the reference-files repo. The function caches the result, @@ -38,9 +45,15 @@ def costum_sigs() -> list[dict]: if response.getcode() != 200: raise ConnectionError - re_identify_map: dict[str, str] = json.loads(response.read()) + custom_list: list[dict] = json.loads(response.read()) - if re_identify_map is None: + if custom_list is None: raise ConnectionError - return re_identify_map + result_list: list[CustomSignature] = [] + + for values in custom_list: + result = CustomSignature(**values) + result_list.append(result) + + return result_list diff --git a/acacore/siegfried/siegfried.py b/acacore/siegfried/siegfried.py index 6edd25b..f46a688 100644 --- a/acacore/siegfried/siegfried.py +++ b/acacore/siegfried/siegfried.py @@ -1,15 +1,10 @@ from datetime import datetime from os import PathLike from pathlib import Path -from subprocess import CompletedProcess -from subprocess import run -from typing import Optional -from typing import Union +from subprocess import CompletedProcess, run +from typing import Optional, Union -from pydantic import BaseModel -from pydantic import ConfigDict -from pydantic import Field -from pydantic import field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator from acacore.exceptions.files import IdentificationError @@ -17,11 +12,11 @@ def _check_process(process: CompletedProcess): """ Raises: - IdentificationError: if the process ends with a return code other than 0 - """ + IdentificationError: if the process ends with a return code other than 0. + """ # noqa: D205 if process.returncode != 0: raise IdentificationError( - process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}" + process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}", ) @@ -32,8 +27,8 @@ class SiegfriedIdentifier(BaseModel): class SiegfriedMatch(BaseModel): ns: str - id: Optional[str] - format: str + id: Optional[str] # noqa: A003 + format: str # noqa: A003 version: str mime: str match_class: str = Field(alias="class") @@ -74,16 +69,16 @@ class Siegfried: https://github.com/richardlehane/siegfried """ - def __init__(self, binary: Union[str, PathLike] = "sf"): + def __init__(self, binary: Union[str, PathLike] = "sf") -> None: """ Args: - binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable + binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable. Raises: - IdentificationError: If Siegfried is not configured properly - """ + IdentificationError: If Siegfried is not configured properly. + """ # noqa: D205 self.binary: str = str(binary) - _check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8")) + _check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8")) # noqa: PLW1510 def identify(self, path: Union[str, PathLike]) -> SiegfriedResult: """ @@ -102,6 +97,7 @@ def identify(self, path: Union[str, PathLike]) -> SiegfriedResult: [self.binary, "-json", "-multi", "1024", str(path)], capture_output=True, encoding="utf-8", + check=False, ) _check_process(process) try: @@ -117,7 +113,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]: paths: The paths to the files Returns: - A tuple of tuples joining the paths with their SiegfriedFile result + tuple[tuple[Path, SiegfriedFile]: A tuple of tuples joining the paths with their SiegfriedFile result Raises: IdentificationError: If there is an error calling Siegfried or processing its results @@ -126,6 +122,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]: [self.binary, "-json", "-multi", "1024", *map(str, paths)], capture_output=True, encoding="utf-8", + check=False, ) _check_process(process) try: diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py deleted file mode 100644 index 46b08e9..0000000 --- a/acacore/siegfried_utils/identify.py +++ /dev/null @@ -1,227 +0,0 @@ -import json -import re -import subprocess -from logging import Logger -from pathlib import Path -from typing import Any, Optional, Tuple - -from acacore.models.identification import Identification -from acacore.reference_files.ref_files import costum_sigs, to_re_identify - - -def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: - """Uses the BOF and EOF to try to determine a ACAUID for a file and update its Identification datastructure. - - If none can be found, simply return the same Identification data structure as it got in the beginning. - - Args: - path (Path): PAth to the file - file_id (Identification): The file identification data structure that should be updated with the new values - - Returns: - Identification: The updated file data structure. - """ - bof, eof = get_bof_and_eof(path) - - sig_for_file = get_aca_signature(bof, eof) - if sig_for_file: - update_file_id(path, file_id, sig_for_file) - return file_id - - -def aca_id(path: Path) -> Optional[str]: - """Tries to find one of our costum ACAUID's for a file, based on its BOF and EOF bytes. Returns `None` if none is found. - - Args: - path (Path): Path to the file to be examined - - Returns: - Optional[str]: Possible ACAUID - """ - bof, eof = get_bof_and_eof(path) - - sig_for_file = get_aca_signature(bof, eof) - if not sig_for_file: - return None - - return sig_for_file.get("puid", None) - - -def get_bof_and_eof(file: Path) -> Tuple[str, str]: - """Get the first and last kilobyte of a file. - - Args: - file (Path): Path to file - - Returns: - Tuple[str,str]: BOF and then EOF as `str`. - """ - with file.open("rb") as file_bytes: - # BOF - bof = file_bytes.read(1024).hex() - # Navigate to EOF - try: - file_bytes.seek(-1024, 2) - except OSError: - # File too small :) - file_bytes.seek(-file_bytes.tell(), 2) - eof = file_bytes.read(1024).hex() - return (bof, eof) - - -def get_aca_signature(bof: str, eof: str) -> Optional[dict]: - """Get the ACA signature of a file type, if one exists. Else return `None`. - - Args: - bof (str): The first kilobyte of a file - eof (str): The last kilobyte of a file - - Returns: - Optional(str): The signature, if one was found. - """ - aca_signatures: list[dict] = costum_sigs() - for sig in aca_signatures: - if "bof" in sig and "eof" in sig: - bof_pattern = re.compile(sig["bof"]) - eof_pattern = re.compile(sig["eof"]) - if sig["operator"] == "OR": - if bof_pattern.search(bof) or eof_pattern.search(eof): - return sig - elif sig["operator"] == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): - return sig - elif "bof" in sig: - bof_pattern = re.compile(sig["bof"]) - if bof_pattern.search(bof): - return sig - elif "eof" in sig: - eof_pattern = re.compile(sig["eof"]) - if eof_pattern.search(eof): - return sig - return None - - -def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identification]: - """Identify multiple files using `siegfried`, and return a dictionary mapping the files path to a Identification datastructure containing the info obtained. - - Also updates FileInfo with obtained PUID, signature name, and warning if applicable. - - Parameters - ---------- - path : pathlib.Path - Path in which to identify files. - - Returns: - ------- - Dict[Path, Identification] - Dictionary containing file path and associated identification - information obtained from siegfried's stdout. - - """ - id_dict: dict[Path, Identification] = {} - - id_result = run_sf_and_get_results_json(path) - - # We get identifiers as a list containing the ditionary, - # soo we have to get the one element our of it - results_dict: Optional[dict] = id_result.get("identifiers", None)[0] - if results_dict and log: - DROID_file_version: Optional[str] = results_dict.get("details") - log.info( - "Running sf with the following version of DROID: " + DROID_file_version if DROID_file_version else "", - ) - for file_result in id_result.get("files", []): - match: dict[str, Any] = {} - for id_match in file_result.get("matches"): - if id_match.get("ns") == "pronom": - match = id_match - if match: - file_identification: Identification - file_path: Path = Path(file_result["filename"]) - - puid = None if match.get("id", "").lower() == "unknown" else match.get("id") - - signature_and_version = None - signature = match.get("format") - version = match.get("version") - if signature: - signature_and_version = f"{signature} ({version})" - warning: str = match.get("warning", "").capitalize() - file_size: int = file_result.get("filesize") - file_errors: Optional[str] = file_result.get("errors", None) - if file_errors: - warning = warning + " ; Errors: " + file_errors - file_identification = Identification( - puid=puid, - signature=signature_and_version or None, - warning=warning or None, - size=file_size, - ) - - # unindentified files - if puid is None: - file_identification = aca_id_for_file_id(file_path, file_identification) - - # re-identify files, warnings or not! - if puid in to_re_identify(): - file_identification = aca_id_for_file_id(file_path, file_identification) - - # Possible MS Office files identified as markup (XML, HTML etc.) - if puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] and "Extension mismatch" in warning: - file_identification = aca_id_for_file_id(file_path, file_identification) - - id_dict.update({file_path: file_identification}) - - return id_dict - - -# --- -# Aux. methods, used as helper methods for the rest of the methods. -# --- - - -def run_sf_and_get_results_json(path: Path) -> dict: - """Run `siegfried` on `path`, and return the result as a dictionary build from the .json output of `sf`. - - Args: - path (Path): A path to a folder containg files or subfolder with files (or more subfolders! ) - - Raises: - OSError: If there is an error with running siegfried or loading the results as a .json file, raises OSError - - Returns: - dict: dictionary created from .json output of siegfried - """ - try: - sf_proc = subprocess.run( - ["sf", "-json", "-multi", "1024", str(path)], - check=True, - capture_output=True, - ) - except Exception as error: - raise OSError(error) - - try: - id_result: dict = json.loads(sf_proc.stdout) - except Exception as error: - raise OSError(error) - - return id_result - - -def update_file_id(path: Path, file_id: Identification, signature: dict[str, str]) -> None: - """Update a file Identification data model with an PUID and signature given as a dictionary. - - It is primarily used by the `costum_id` method. - Also adds a 'Extension Mismatch' warning if the extension of the file is not as we excpect from the given dict. - - Args: - path (Path): Path to the file - file_id (Identification): File identification data model - signature (dict[str, str]): Dictionary with new values for PUID and signature. - """ - file_id.puid = signature["puid"] - file_id.signature = signature["signature"] - if path.suffix.lower() != signature["extension"].lower(): - file_id.warning = "Extension mismatch" - else: - file_id.warning = None diff --git a/acacore/utils/functions.py b/acacore/utils/functions.py index 6aa828e..e72e482 100644 --- a/acacore/utils/functions.py +++ b/acacore/utils/functions.py @@ -1,6 +1,4 @@ -from typing import Callable -from typing import Optional -from typing import TypeVar +from typing import Callable, Optional, TypeVar T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/utils/log.py b/acacore/utils/log.py index 4569a8b..8b249b8 100644 --- a/acacore/utils/log.py +++ b/acacore/utils/log.py @@ -1,8 +1,4 @@ -from logging import FileHandler -from logging import Formatter -from logging import INFO -from logging import Logger -from logging import getLogger +from logging import INFO, FileHandler, Formatter, Logger, getLogger from pathlib import Path @@ -20,7 +16,6 @@ def setup_logger(log_name: str, log_path: Path) -> Logger: Returns: A Logger instance. """ - if not log_path.parent.exists(): Path.mkdir(log_path.parent, parents=True, exist_ok=True)