From 1e2f9b5f4e99a788b79994de36c0b06509392ee8 Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 10 Oct 2023 14:43:06 +0200 Subject: [PATCH 1/7] added functionality to run and use siegfried * Also added functions to get info from reference files * Updated the docstring for several of the new functions * Decomposed the new functions into multiple helper functions to avoid duplicate code --- acacore/reference_files/__init__.py | 2 + acacore/reference_files/ref_files.py | 44 +++++ acacore/siegfried_utils/__init__.py | 2 + acacore/siegfried_utils/identify.py | 249 +++++++++++++++++++++++++++ 4 files changed, 297 insertions(+) create mode 100644 acacore/reference_files/__init__.py create mode 100644 acacore/reference_files/ref_files.py create mode 100644 acacore/siegfried_utils/__init__.py create mode 100644 acacore/siegfried_utils/identify.py diff --git a/acacore/reference_files/__init__.py b/acacore/reference_files/__init__.py new file mode 100644 index 0000000..c603535 --- /dev/null +++ b/acacore/reference_files/__init__.py @@ -0,0 +1,2 @@ +"""Collection of methods that allows us to """ +from . import ref_files \ No newline at end of file diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py new file mode 100644 index 0000000..50456bd --- /dev/null +++ b/acacore/reference_files/ref_files.py @@ -0,0 +1,44 @@ +import json +from functools import lru_cache +from http.client import HTTPResponse +from urllib import request + + +@lru_cache +def to_re_identify() -> dict[str, str]: + """Gets the json file with the different formats that we wish to reidentify. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen("https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json") + if response.getcode() != 200: + raise ConnectionError + + re_identify_map: dict[str, str] = json.loads(response.read()) + + if re_identify_map is None: + raise ConnectionError + + return re_identify_map + + +@lru_cache +def costum_sigs() -> list[dict]: + """Gets the json file with our own costum formats in a list. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen( + "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/custom_signatures.json", + ) + if response.getcode() != 200: + raise ConnectionError + + re_identify_map: dict[str, str] = json.loads(response.read()) + + if re_identify_map is None: + raise ConnectionError + + return re_identify_map diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py new file mode 100644 index 0000000..b0f30ed --- /dev/null +++ b/acacore/siegfried_utils/__init__.py @@ -0,0 +1,2 @@ +"""Place for functions and classes that act as an entrance to siegfried""" +from . import identify \ No newline at end of file diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py new file mode 100644 index 0000000..6425bad --- /dev/null +++ b/acacore/siegfried_utils/identify.py @@ -0,0 +1,249 @@ +import json +import re +import subprocess +from logging import Logger +from pathlib import Path +from typing import Any, Optional, Tuple + +from acacore.models.identification import Identification +from acacore.reference_files.ref_files import costum_sigs, to_re_identify + + +def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: + """Uses the BOF and EOF to try to determine a ACAUID for a file and update its Identification datastructure. + + If none can be found, simply return the same Identification data structure as it got in the beginning. + + Args: + path (Path): PAth to the file + file_id (Identification): The file identification data structure that should be updated with the new values + + Returns: + Identification: The updated file data structure. + """ + bof, eof = get_bof_and_eof(path) + + sig_for_file = get_aca_signature(bof, eof) + if sig_for_file: + update_file_id(path, file_id, sig_for_file) + return file_id + +def aca_id(path: Path) -> Optional[str]: + """Tries to find one of our costum ACAUID's for a file, based on its BOF and EOF bytes. Returns `None` if none is found. + + Args: + path (Path): Path to the file to be examined + + Returns: + Optional[str]: Possible ACAUID + """ + bof, eof = get_bof_and_eof(path) + + sig_for_file = get_aca_signature(bof, eof) + if not sig_for_file: + return None + + return sig_for_file.get("puid", None) + +def get_bof_and_eof(file: Path) -> Tuple[str,str]: + """Get the first and last kilobyte of a file. + + Args: + file (Path): Path to file + + Returns: + Tuple[str,str]: BOF and then EOF as `str`. + """ + with file.open("rb") as file_bytes: + # BOF + bof = file_bytes.read(1024).hex() + # Navigate to EOF + try: + file_bytes.seek(-1024, 2) + except OSError: + # File too small :) + file_bytes.seek(-file_bytes.tell(), 2) + eof = file_bytes.read(1024).hex() + return (bof, eof) + +def get_aca_signature(bof: str, eof: str) -> Optional[dict]: + """Get the ACA signature of a file type, if one exists. Else return `None`. + + Args: + bof (str): The first kilobyte of a file + eof (str): The last kilobyte of a file + + Returns: + Optional(str): The signature, if one was found. + """ + aca_signatures: list[dict] = costum_sigs() + for sig in aca_signatures: + if "bof" in sig and "eof" in sig: + bof_pattern = re.compile(sig["bof"]) + eof_pattern = re.compile(sig["eof"]) + if sig["operator"] == "OR": + if bof_pattern.search(bof) or eof_pattern.search(eof): + return sig + elif sig["operator"] == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): + return sig + elif "bof" in sig: + bof_pattern = re.compile(sig["bof"]) + if bof_pattern.search(bof): + return sig + elif "eof" in sig: + eof_pattern = re.compile(sig["eof"]) + if eof_pattern.search(eof): + return sig + return None + +def sf_id_puid(path: Path) -> Optional[str]: + """Identify a file and return only its PUID using siegfried. + + Args: + path (`Path`): Path to the file to identify. + + Returns: + Optional[str]: The PUID of the file, or `None` if none is found + """ + id_result = run_sf_and_get_results_json(path) + + puid: Optional[str] = None + for file_result in id_result.get("files", []): + match: dict = {} + for id_match in file_result.get("matches"): + if id_match.get("ns") == "pronom": + match = id_match + if match: + puid = None if match.get("id", "").lower() == "unknown" else match.get("id") + break + return puid + + +def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identification]: + """Identify multiple files using `siegfried`, and return a dictionary mapping the files path to a Identification datastructure containing the info obtained. + + Also updates FileInfo with obtained PUID, signature name, and warning if applicable. + + Parameters + ---------- + path : pathlib.Path + Path in which to identify files. + + Returns: + ------- + Dict[Path, Identification] + Dictionary containing file path and associated identification + information obtained from siegfried's stdout. + + """ + id_dict: dict[Path, Identification] = {} + + id_result = run_sf_and_get_results_json(path) + + + # We get identifiers as a list containing the ditionary, + # soo we have to get the one element our of it + results_dict: Optional[dict] = id_result.get("identifiers", None)[0] + if results_dict and log: + DROID_file_version: Optional[str] = results_dict.get("details") + log.info( + "Running sf with the following version of DROID: " + DROID_file_version + if DROID_file_version + else "", + ) + for file_result in id_result.get("files", []): + match: dict[str, Any] = {} + for id_match in file_result.get("matches"): + if id_match.get("ns") == "pronom": + match = id_match + if match: + file_identification: Identification + file_path: Path = Path(file_result["filename"]) + + puid = None if match.get("id", "").lower() == "unknown" else match.get("id") + + signature_and_version = None + signature = match.get("format") + version = match.get("version") + if signature: + signature_and_version = f"{signature} ({version})" + warning: str = match.get("warning", "").capitalize() + file_size: int = file_result.get("filesize") + file_errors: Optional[str] = file_result.get("errors", None) + if file_errors: + warning = warning + " ; Errors: " + file_errors + file_identification = Identification( + puid=puid, + signature=signature_and_version or None, + warning=warning or None, + size=file_size, + ) + + # unindentified files + if puid is None: + file_identification = aca_id_for_file_id(file_path, file_identification) + + # re-identify files, warnings or not! + if puid in to_re_identify(): + file_identification = aca_id_for_file_id(file_path, file_identification) + + # Possible MS Office files identified as markup (XML, HTML etc.) + if ( + puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] + and "Extension mismatch" in warning + ): + file_identification = aca_id_for_file_id(file_path, file_identification) + + id_dict.update({file_path: file_identification}) + + return id_dict + +# --- +# Aux. methods, used as helper methods for the rest of the methods. +# --- + +def run_sf_and_get_results_json(path: Path) -> dict: + """Run `siegfried` on `path`, and return the result as a dictionary build from the .json output of `sf`. + + Args: + path (Path): A path to a folder containg files or subfolder with files (or more subfolders! ) + + Raises: + OSError: If there is an error with running siegfried or loading the results as a .json file, raises OSError + + Returns: + dict: dictionary created from .json output of siegfried + """ + try: + sf_proc = subprocess.run( + ["sf", "-json", "-multi", "1024", str(path)], + check=True, + capture_output=True, + ) + except Exception as error: + raise OSError(error) + + try: + id_result: dict = json.loads(sf_proc.stdout) + except Exception as error: + raise OSError(error) + + return id_result + +def update_file_id(path: Path, file_id: Identification, signature: dict[str, str]) -> None: + """Update a file Identification data model with an PUID and signature given as a dictionary. + + It is primarily used by the `costum_id` method. + Also adds a 'Extension Mismatch' warning if the extension of the file is not as we excpect from the given dict. + + Args: + path (Path): Path to the file + file_id (Identification): File identification data model + signature (dict[str, str]): Dictionary with new values for PUID and signature. + """ + file_id.puid = signature["puid"] + file_id.signature = signature["signature"] + if path.suffix.lower() != signature["extension"].lower(): + file_id.warning = "Extension mismatch" + else: + file_id.warning = None From 2a6f926a9949dd88736ed8eb62423df82e9d7245 Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 10 Oct 2023 14:43:06 +0200 Subject: [PATCH 2/7] linting --- acacore/database/files_db.py | 18 +- acacore/reference_files/__init__.py | 2 + acacore/reference_files/ref_files.py | 46 +++++ acacore/siegfried_utils/__init__.py | 2 + acacore/siegfried_utils/identify.py | 250 +++++++++++++++++++++++++++ 5 files changed, 307 insertions(+), 11 deletions(-) create mode 100644 acacore/reference_files/__init__.py create mode 100644 acacore/reference_files/ref_files.py create mode 100644 acacore/siegfried_utils/__init__.py create mode 100644 acacore/siegfried_utils/identify.py diff --git a/acacore/database/files_db.py b/acacore/database/files_db.py index f644748..217a7b1 100644 --- a/acacore/database/files_db.py +++ b/acacore/database/files_db.py @@ -1,16 +1,12 @@ from datetime import datetime from os import PathLike from sqlite3 import Connection -from typing import Any -from typing import Optional -from typing import Type -from typing import Union +from typing import Any, Optional, Type, Union from uuid import UUID from acacore.utils.functions import or_none -from .base import Column -from .base import FileDBBase -from .base import SelectColumn + +from .base import Column, FileDBBase, SelectColumn class FileDB(FileDBBase): @@ -46,9 +42,9 @@ def __init__( uri: If set to True, database is interpreted as a URI with a file path and an optional query string. """ from acacore.models.file import ConvertedFile, File + from acacore.models.history import HistoryEntry from acacore.models.identification import SignatureCount from acacore.models.metadata import Metadata - from acacore.models.history import HistoryEntry super().__init__( database, @@ -137,7 +133,7 @@ def add_history( self, uuid: UUID, operation: str, - data: Any, + data: Any, # noqa: ANN401 reason: Optional[str] = None, *, time: Optional[datetime] = None, @@ -148,6 +144,6 @@ def add_history( operation=operation, data=data, reason=reason, - time=time or datetime.now(), - ) + time=time or datetime.now(tz=None), # noqa: DTZ005 + ), ) diff --git a/acacore/reference_files/__init__.py b/acacore/reference_files/__init__.py new file mode 100644 index 0000000..b82916f --- /dev/null +++ b/acacore/reference_files/__init__.py @@ -0,0 +1,2 @@ +"""Collection of methods that allows us to """ +from . import ref_files diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py new file mode 100644 index 0000000..0098d59 --- /dev/null +++ b/acacore/reference_files/ref_files.py @@ -0,0 +1,46 @@ +import json +from functools import lru_cache +from http.client import HTTPResponse +from urllib import request + + +@lru_cache +def to_re_identify() -> dict[str, str]: + """Gets the json file with the different formats that we wish to reidentify. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen( + "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json", + ) + if response.getcode() != 200: + raise ConnectionError + + re_identify_map: dict[str, str] = json.loads(response.read()) + + if re_identify_map is None: + raise ConnectionError + + return re_identify_map + + +@lru_cache +def costum_sigs() -> list[dict]: + """Gets the json file with our own costum formats in a list. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen( + "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/custom_signatures.json", + ) + if response.getcode() != 200: + raise ConnectionError + + re_identify_map: dict[str, str] = json.loads(response.read()) + + if re_identify_map is None: + raise ConnectionError + + return re_identify_map diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py new file mode 100644 index 0000000..d9e40a4 --- /dev/null +++ b/acacore/siegfried_utils/__init__.py @@ -0,0 +1,2 @@ +"""Place for functions and classes that act as an entrance to siegfried""" +from . import identify diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py new file mode 100644 index 0000000..2d3a34a --- /dev/null +++ b/acacore/siegfried_utils/identify.py @@ -0,0 +1,250 @@ +import json +import re +import subprocess +from logging import Logger +from pathlib import Path +from typing import Any, Optional, Tuple + +from acacore.models.identification import Identification +from acacore.reference_files.ref_files import costum_sigs, to_re_identify + + +def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: + """Uses the BOF and EOF to try to determine a ACAUID for a file and update its Identification datastructure. + + If none can be found, simply return the same Identification data structure as it got in the beginning. + + Args: + path (Path): PAth to the file + file_id (Identification): The file identification data structure that should be updated with the new values + + Returns: + Identification: The updated file data structure. + """ + bof, eof = get_bof_and_eof(path) + + sig_for_file = get_aca_signature(bof, eof) + if sig_for_file: + update_file_id(path, file_id, sig_for_file) + return file_id + + +def aca_id(path: Path) -> Optional[str]: + """Tries to find one of our costum ACAUID's for a file, based on its BOF and EOF bytes. Returns `None` if none is found. + + Args: + path (Path): Path to the file to be examined + + Returns: + Optional[str]: Possible ACAUID + """ + bof, eof = get_bof_and_eof(path) + + sig_for_file = get_aca_signature(bof, eof) + if not sig_for_file: + return None + + return sig_for_file.get("puid", None) + + +def get_bof_and_eof(file: Path) -> Tuple[str, str]: + """Get the first and last kilobyte of a file. + + Args: + file (Path): Path to file + + Returns: + Tuple[str,str]: BOF and then EOF as `str`. + """ + with file.open("rb") as file_bytes: + # BOF + bof = file_bytes.read(1024).hex() + # Navigate to EOF + try: + file_bytes.seek(-1024, 2) + except OSError: + # File too small :) + file_bytes.seek(-file_bytes.tell(), 2) + eof = file_bytes.read(1024).hex() + return (bof, eof) + + +def get_aca_signature(bof: str, eof: str) -> Optional[dict]: + """Get the ACA signature of a file type, if one exists. Else return `None`. + + Args: + bof (str): The first kilobyte of a file + eof (str): The last kilobyte of a file + + Returns: + Optional(str): The signature, if one was found. + """ + aca_signatures: list[dict] = costum_sigs() + for sig in aca_signatures: + if "bof" in sig and "eof" in sig: + bof_pattern = re.compile(sig["bof"]) + eof_pattern = re.compile(sig["eof"]) + if sig["operator"] == "OR": + if bof_pattern.search(bof) or eof_pattern.search(eof): + return sig + elif sig["operator"] == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): + return sig + elif "bof" in sig: + bof_pattern = re.compile(sig["bof"]) + if bof_pattern.search(bof): + return sig + elif "eof" in sig: + eof_pattern = re.compile(sig["eof"]) + if eof_pattern.search(eof): + return sig + return None + + +def sf_id_puid(path: Path) -> Optional[str]: + """Identify a file and return only its PUID using siegfried. + + Args: + path (`Path`): Path to the file to identify. + + Returns: + Optional[str]: The PUID of the file, or `None` if none is found + """ + id_result = run_sf_and_get_results_json(path) + + puid: Optional[str] = None + for file_result in id_result.get("files", []): + match: dict = {} + for id_match in file_result.get("matches"): + if id_match.get("ns") == "pronom": + match = id_match + if match: + puid = None if match.get("id", "").lower() == "unknown" else match.get("id") + break + return puid + + +def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identification]: + """Identify multiple files using `siegfried`, and return a dictionary mapping the files path to a Identification datastructure containing the info obtained. + + Also updates FileInfo with obtained PUID, signature name, and warning if applicable. + + Parameters + ---------- + path : pathlib.Path + Path in which to identify files. + + Returns: + ------- + Dict[Path, Identification] + Dictionary containing file path and associated identification + information obtained from siegfried's stdout. + + """ + id_dict: dict[Path, Identification] = {} + + id_result = run_sf_and_get_results_json(path) + + # We get identifiers as a list containing the ditionary, + # soo we have to get the one element our of it + results_dict: Optional[dict] = id_result.get("identifiers", None)[0] + if results_dict and log: + DROID_file_version: Optional[str] = results_dict.get("details") + log.info( + "Running sf with the following version of DROID: " + DROID_file_version if DROID_file_version else "", + ) + for file_result in id_result.get("files", []): + match: dict[str, Any] = {} + for id_match in file_result.get("matches"): + if id_match.get("ns") == "pronom": + match = id_match + if match: + file_identification: Identification + file_path: Path = Path(file_result["filename"]) + + puid = None if match.get("id", "").lower() == "unknown" else match.get("id") + + signature_and_version = None + signature = match.get("format") + version = match.get("version") + if signature: + signature_and_version = f"{signature} ({version})" + warning: str = match.get("warning", "").capitalize() + file_size: int = file_result.get("filesize") + file_errors: Optional[str] = file_result.get("errors", None) + if file_errors: + warning = warning + " ; Errors: " + file_errors + file_identification = Identification( + puid=puid, + signature=signature_and_version or None, + warning=warning or None, + size=file_size, + ) + + # unindentified files + if puid is None: + file_identification = aca_id_for_file_id(file_path, file_identification) + + # re-identify files, warnings or not! + if puid in to_re_identify(): + file_identification = aca_id_for_file_id(file_path, file_identification) + + # Possible MS Office files identified as markup (XML, HTML etc.) + if puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] and "Extension mismatch" in warning: + file_identification = aca_id_for_file_id(file_path, file_identification) + + id_dict.update({file_path: file_identification}) + + return id_dict + + +# --- +# Aux. methods, used as helper methods for the rest of the methods. +# --- + + +def run_sf_and_get_results_json(path: Path) -> dict: + """Run `siegfried` on `path`, and return the result as a dictionary build from the .json output of `sf`. + + Args: + path (Path): A path to a folder containg files or subfolder with files (or more subfolders! ) + + Raises: + OSError: If there is an error with running siegfried or loading the results as a .json file, raises OSError + + Returns: + dict: dictionary created from .json output of siegfried + """ + try: + sf_proc = subprocess.run( + ["sf", "-json", "-multi", "1024", str(path)], + check=True, + capture_output=True, + ) + except Exception as error: + raise OSError(error) + + try: + id_result: dict = json.loads(sf_proc.stdout) + except Exception as error: + raise OSError(error) + + return id_result + + +def update_file_id(path: Path, file_id: Identification, signature: dict[str, str]) -> None: + """Update a file Identification data model with an PUID and signature given as a dictionary. + + It is primarily used by the `costum_id` method. + Also adds a 'Extension Mismatch' warning if the extension of the file is not as we excpect from the given dict. + + Args: + path (Path): Path to the file + file_id (Identification): File identification data model + signature (dict[str, str]): Dictionary with new values for PUID and signature. + """ + file_id.puid = signature["puid"] + file_id.signature = signature["signature"] + if path.suffix.lower() != signature["extension"].lower(): + file_id.warning = "Extension mismatch" + else: + file_id.warning = None From 5bc89a303b8fc16d28f537becf8df9965303d702 Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 10 Oct 2023 15:01:59 +0200 Subject: [PATCH 3/7] Revert "linting" This reverts commit 2a6f926a9949dd88736ed8eb62423df82e9d7245. --- acacore/database/files_db.py | 18 +- acacore/reference_files/__init__.py | 2 - acacore/reference_files/ref_files.py | 46 ----- acacore/siegfried_utils/__init__.py | 2 - acacore/siegfried_utils/identify.py | 250 --------------------------- 5 files changed, 11 insertions(+), 307 deletions(-) delete mode 100644 acacore/reference_files/__init__.py delete mode 100644 acacore/reference_files/ref_files.py delete mode 100644 acacore/siegfried_utils/__init__.py delete mode 100644 acacore/siegfried_utils/identify.py diff --git a/acacore/database/files_db.py b/acacore/database/files_db.py index 217a7b1..f644748 100644 --- a/acacore/database/files_db.py +++ b/acacore/database/files_db.py @@ -1,12 +1,16 @@ from datetime import datetime from os import PathLike from sqlite3 import Connection -from typing import Any, Optional, Type, Union +from typing import Any +from typing import Optional +from typing import Type +from typing import Union from uuid import UUID from acacore.utils.functions import or_none - -from .base import Column, FileDBBase, SelectColumn +from .base import Column +from .base import FileDBBase +from .base import SelectColumn class FileDB(FileDBBase): @@ -42,9 +46,9 @@ def __init__( uri: If set to True, database is interpreted as a URI with a file path and an optional query string. """ from acacore.models.file import ConvertedFile, File - from acacore.models.history import HistoryEntry from acacore.models.identification import SignatureCount from acacore.models.metadata import Metadata + from acacore.models.history import HistoryEntry super().__init__( database, @@ -133,7 +137,7 @@ def add_history( self, uuid: UUID, operation: str, - data: Any, # noqa: ANN401 + data: Any, reason: Optional[str] = None, *, time: Optional[datetime] = None, @@ -144,6 +148,6 @@ def add_history( operation=operation, data=data, reason=reason, - time=time or datetime.now(tz=None), # noqa: DTZ005 - ), + time=time or datetime.now(), + ) ) diff --git a/acacore/reference_files/__init__.py b/acacore/reference_files/__init__.py deleted file mode 100644 index b82916f..0000000 --- a/acacore/reference_files/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Collection of methods that allows us to """ -from . import ref_files diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py deleted file mode 100644 index 0098d59..0000000 --- a/acacore/reference_files/ref_files.py +++ /dev/null @@ -1,46 +0,0 @@ -import json -from functools import lru_cache -from http.client import HTTPResponse -from urllib import request - - -@lru_cache -def to_re_identify() -> dict[str, str]: - """Gets the json file with the different formats that we wish to reidentify. - - Is kept updated on the reference-files repo. The function caches the result, - soo multiple calls in the same run should not be an issue. - """ - response: HTTPResponse = request.urlopen( - "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json", - ) - if response.getcode() != 200: - raise ConnectionError - - re_identify_map: dict[str, str] = json.loads(response.read()) - - if re_identify_map is None: - raise ConnectionError - - return re_identify_map - - -@lru_cache -def costum_sigs() -> list[dict]: - """Gets the json file with our own costum formats in a list. - - Is kept updated on the reference-files repo. The function caches the result, - soo multiple calls in the same run should not be an issue. - """ - response: HTTPResponse = request.urlopen( - "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/custom_signatures.json", - ) - if response.getcode() != 200: - raise ConnectionError - - re_identify_map: dict[str, str] = json.loads(response.read()) - - if re_identify_map is None: - raise ConnectionError - - return re_identify_map diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py deleted file mode 100644 index d9e40a4..0000000 --- a/acacore/siegfried_utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Place for functions and classes that act as an entrance to siegfried""" -from . import identify diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py deleted file mode 100644 index 2d3a34a..0000000 --- a/acacore/siegfried_utils/identify.py +++ /dev/null @@ -1,250 +0,0 @@ -import json -import re -import subprocess -from logging import Logger -from pathlib import Path -from typing import Any, Optional, Tuple - -from acacore.models.identification import Identification -from acacore.reference_files.ref_files import costum_sigs, to_re_identify - - -def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: - """Uses the BOF and EOF to try to determine a ACAUID for a file and update its Identification datastructure. - - If none can be found, simply return the same Identification data structure as it got in the beginning. - - Args: - path (Path): PAth to the file - file_id (Identification): The file identification data structure that should be updated with the new values - - Returns: - Identification: The updated file data structure. - """ - bof, eof = get_bof_and_eof(path) - - sig_for_file = get_aca_signature(bof, eof) - if sig_for_file: - update_file_id(path, file_id, sig_for_file) - return file_id - - -def aca_id(path: Path) -> Optional[str]: - """Tries to find one of our costum ACAUID's for a file, based on its BOF and EOF bytes. Returns `None` if none is found. - - Args: - path (Path): Path to the file to be examined - - Returns: - Optional[str]: Possible ACAUID - """ - bof, eof = get_bof_and_eof(path) - - sig_for_file = get_aca_signature(bof, eof) - if not sig_for_file: - return None - - return sig_for_file.get("puid", None) - - -def get_bof_and_eof(file: Path) -> Tuple[str, str]: - """Get the first and last kilobyte of a file. - - Args: - file (Path): Path to file - - Returns: - Tuple[str,str]: BOF and then EOF as `str`. - """ - with file.open("rb") as file_bytes: - # BOF - bof = file_bytes.read(1024).hex() - # Navigate to EOF - try: - file_bytes.seek(-1024, 2) - except OSError: - # File too small :) - file_bytes.seek(-file_bytes.tell(), 2) - eof = file_bytes.read(1024).hex() - return (bof, eof) - - -def get_aca_signature(bof: str, eof: str) -> Optional[dict]: - """Get the ACA signature of a file type, if one exists. Else return `None`. - - Args: - bof (str): The first kilobyte of a file - eof (str): The last kilobyte of a file - - Returns: - Optional(str): The signature, if one was found. - """ - aca_signatures: list[dict] = costum_sigs() - for sig in aca_signatures: - if "bof" in sig and "eof" in sig: - bof_pattern = re.compile(sig["bof"]) - eof_pattern = re.compile(sig["eof"]) - if sig["operator"] == "OR": - if bof_pattern.search(bof) or eof_pattern.search(eof): - return sig - elif sig["operator"] == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): - return sig - elif "bof" in sig: - bof_pattern = re.compile(sig["bof"]) - if bof_pattern.search(bof): - return sig - elif "eof" in sig: - eof_pattern = re.compile(sig["eof"]) - if eof_pattern.search(eof): - return sig - return None - - -def sf_id_puid(path: Path) -> Optional[str]: - """Identify a file and return only its PUID using siegfried. - - Args: - path (`Path`): Path to the file to identify. - - Returns: - Optional[str]: The PUID of the file, or `None` if none is found - """ - id_result = run_sf_and_get_results_json(path) - - puid: Optional[str] = None - for file_result in id_result.get("files", []): - match: dict = {} - for id_match in file_result.get("matches"): - if id_match.get("ns") == "pronom": - match = id_match - if match: - puid = None if match.get("id", "").lower() == "unknown" else match.get("id") - break - return puid - - -def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identification]: - """Identify multiple files using `siegfried`, and return a dictionary mapping the files path to a Identification datastructure containing the info obtained. - - Also updates FileInfo with obtained PUID, signature name, and warning if applicable. - - Parameters - ---------- - path : pathlib.Path - Path in which to identify files. - - Returns: - ------- - Dict[Path, Identification] - Dictionary containing file path and associated identification - information obtained from siegfried's stdout. - - """ - id_dict: dict[Path, Identification] = {} - - id_result = run_sf_and_get_results_json(path) - - # We get identifiers as a list containing the ditionary, - # soo we have to get the one element our of it - results_dict: Optional[dict] = id_result.get("identifiers", None)[0] - if results_dict and log: - DROID_file_version: Optional[str] = results_dict.get("details") - log.info( - "Running sf with the following version of DROID: " + DROID_file_version if DROID_file_version else "", - ) - for file_result in id_result.get("files", []): - match: dict[str, Any] = {} - for id_match in file_result.get("matches"): - if id_match.get("ns") == "pronom": - match = id_match - if match: - file_identification: Identification - file_path: Path = Path(file_result["filename"]) - - puid = None if match.get("id", "").lower() == "unknown" else match.get("id") - - signature_and_version = None - signature = match.get("format") - version = match.get("version") - if signature: - signature_and_version = f"{signature} ({version})" - warning: str = match.get("warning", "").capitalize() - file_size: int = file_result.get("filesize") - file_errors: Optional[str] = file_result.get("errors", None) - if file_errors: - warning = warning + " ; Errors: " + file_errors - file_identification = Identification( - puid=puid, - signature=signature_and_version or None, - warning=warning or None, - size=file_size, - ) - - # unindentified files - if puid is None: - file_identification = aca_id_for_file_id(file_path, file_identification) - - # re-identify files, warnings or not! - if puid in to_re_identify(): - file_identification = aca_id_for_file_id(file_path, file_identification) - - # Possible MS Office files identified as markup (XML, HTML etc.) - if puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] and "Extension mismatch" in warning: - file_identification = aca_id_for_file_id(file_path, file_identification) - - id_dict.update({file_path: file_identification}) - - return id_dict - - -# --- -# Aux. methods, used as helper methods for the rest of the methods. -# --- - - -def run_sf_and_get_results_json(path: Path) -> dict: - """Run `siegfried` on `path`, and return the result as a dictionary build from the .json output of `sf`. - - Args: - path (Path): A path to a folder containg files or subfolder with files (or more subfolders! ) - - Raises: - OSError: If there is an error with running siegfried or loading the results as a .json file, raises OSError - - Returns: - dict: dictionary created from .json output of siegfried - """ - try: - sf_proc = subprocess.run( - ["sf", "-json", "-multi", "1024", str(path)], - check=True, - capture_output=True, - ) - except Exception as error: - raise OSError(error) - - try: - id_result: dict = json.loads(sf_proc.stdout) - except Exception as error: - raise OSError(error) - - return id_result - - -def update_file_id(path: Path, file_id: Identification, signature: dict[str, str]) -> None: - """Update a file Identification data model with an PUID and signature given as a dictionary. - - It is primarily used by the `costum_id` method. - Also adds a 'Extension Mismatch' warning if the extension of the file is not as we excpect from the given dict. - - Args: - path (Path): Path to the file - file_id (Identification): File identification data model - signature (dict[str, str]): Dictionary with new values for PUID and signature. - """ - file_id.puid = signature["puid"] - file_id.signature = signature["signature"] - if path.suffix.lower() != signature["extension"].lower(): - file_id.warning = "Extension mismatch" - else: - file_id.warning = None From f8d5651681d51c730cedc62ed30fee7d90d15fa0 Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 10 Oct 2023 15:03:48 +0200 Subject: [PATCH 4/7] linting --- acacore/database/files_db.py | 18 +++++++----------- acacore/reference_files/__init__.py | 2 +- acacore/reference_files/ref_files.py | 4 +++- acacore/siegfried_utils/__init__.py | 2 +- acacore/siegfried_utils/identify.py | 19 ++++++++++--------- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/acacore/database/files_db.py b/acacore/database/files_db.py index f644748..2a2742a 100644 --- a/acacore/database/files_db.py +++ b/acacore/database/files_db.py @@ -1,16 +1,12 @@ from datetime import datetime from os import PathLike from sqlite3 import Connection -from typing import Any -from typing import Optional -from typing import Type -from typing import Union +from typing import Any, Optional, Type, Union from uuid import UUID from acacore.utils.functions import or_none -from .base import Column -from .base import FileDBBase -from .base import SelectColumn + +from .base import Column, FileDBBase, SelectColumn class FileDB(FileDBBase): @@ -46,9 +42,9 @@ def __init__( uri: If set to True, database is interpreted as a URI with a file path and an optional query string. """ from acacore.models.file import ConvertedFile, File + from acacore.models.history import HistoryEntry from acacore.models.identification import SignatureCount from acacore.models.metadata import Metadata - from acacore.models.history import HistoryEntry super().__init__( database, @@ -137,7 +133,7 @@ def add_history( self, uuid: UUID, operation: str, - data: Any, + data: Any, # noqa: ANN401 reason: Optional[str] = None, *, time: Optional[datetime] = None, @@ -148,6 +144,6 @@ def add_history( operation=operation, data=data, reason=reason, - time=time or datetime.now(), - ) + time=time or datetime.now(), # noqa: DTZ005 + ), ) diff --git a/acacore/reference_files/__init__.py b/acacore/reference_files/__init__.py index c603535..b82916f 100644 --- a/acacore/reference_files/__init__.py +++ b/acacore/reference_files/__init__.py @@ -1,2 +1,2 @@ """Collection of methods that allows us to """ -from . import ref_files \ No newline at end of file +from . import ref_files diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py index 50456bd..0098d59 100644 --- a/acacore/reference_files/ref_files.py +++ b/acacore/reference_files/ref_files.py @@ -11,7 +11,9 @@ def to_re_identify() -> dict[str, str]: Is kept updated on the reference-files repo. The function caches the result, soo multiple calls in the same run should not be an issue. """ - response: HTTPResponse = request.urlopen("https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json") + response: HTTPResponse = request.urlopen( + "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json", + ) if response.getcode() != 200: raise ConnectionError diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py index b0f30ed..d9e40a4 100644 --- a/acacore/siegfried_utils/__init__.py +++ b/acacore/siegfried_utils/__init__.py @@ -1,2 +1,2 @@ """Place for functions and classes that act as an entrance to siegfried""" -from . import identify \ No newline at end of file +from . import identify diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py index 6425bad..2d3a34a 100644 --- a/acacore/siegfried_utils/identify.py +++ b/acacore/siegfried_utils/identify.py @@ -28,6 +28,7 @@ def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: update_file_id(path, file_id, sig_for_file) return file_id + def aca_id(path: Path) -> Optional[str]: """Tries to find one of our costum ACAUID's for a file, based on its BOF and EOF bytes. Returns `None` if none is found. @@ -45,7 +46,8 @@ def aca_id(path: Path) -> Optional[str]: return sig_for_file.get("puid", None) -def get_bof_and_eof(file: Path) -> Tuple[str,str]: + +def get_bof_and_eof(file: Path) -> Tuple[str, str]: """Get the first and last kilobyte of a file. Args: @@ -66,6 +68,7 @@ def get_bof_and_eof(file: Path) -> Tuple[str,str]: eof = file_bytes.read(1024).hex() return (bof, eof) + def get_aca_signature(bof: str, eof: str) -> Optional[dict]: """Get the ACA signature of a file type, if one exists. Else return `None`. @@ -96,6 +99,7 @@ def get_aca_signature(bof: str, eof: str) -> Optional[dict]: return sig return None + def sf_id_puid(path: Path) -> Optional[str]: """Identify a file and return only its PUID using siegfried. @@ -140,16 +144,13 @@ def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identific id_result = run_sf_and_get_results_json(path) - # We get identifiers as a list containing the ditionary, # soo we have to get the one element our of it results_dict: Optional[dict] = id_result.get("identifiers", None)[0] if results_dict and log: DROID_file_version: Optional[str] = results_dict.get("details") log.info( - "Running sf with the following version of DROID: " + DROID_file_version - if DROID_file_version - else "", + "Running sf with the following version of DROID: " + DROID_file_version if DROID_file_version else "", ) for file_result in id_result.get("files", []): match: dict[str, Any] = {} @@ -188,20 +189,19 @@ def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identific file_identification = aca_id_for_file_id(file_path, file_identification) # Possible MS Office files identified as markup (XML, HTML etc.) - if ( - puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] - and "Extension mismatch" in warning - ): + if puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] and "Extension mismatch" in warning: file_identification = aca_id_for_file_id(file_path, file_identification) id_dict.update({file_path: file_identification}) return id_dict + # --- # Aux. methods, used as helper methods for the rest of the methods. # --- + def run_sf_and_get_results_json(path: Path) -> dict: """Run `siegfried` on `path`, and return the result as a dictionary build from the .json output of `sf`. @@ -230,6 +230,7 @@ def run_sf_and_get_results_json(path: Path) -> dict: return id_result + def update_file_id(path: Path, file_id: Identification, signature: dict[str, str]) -> None: """Update a file Identification data model with an PUID and signature given as a dictionary. From ff523178077d117e69ccbe154a2da2d1fd96ba29 Mon Sep 17 00:00:00 2001 From: Matteo Campinoti Date: Mon, 16 Oct 2023 14:31:55 +0200 Subject: [PATCH 5/7] acacore - format with black --- acacore/database/base.py | 16 +++++++++++++--- acacore/database/column.py | 7 ++++++- acacore/database/files_db.py | 10 +++++++--- acacore/models/file.py | 5 +++-- acacore/models/file_data.py | 5 +++-- acacore/models/history.py | 3 ++- acacore/models/identification.py | 3 ++- acacore/siegfried_utils/identify.py | 7 +++++-- acacore/utils/functions.py | 4 +++- acacore/utils/log.py | 6 +++++- 10 files changed, 49 insertions(+), 17 deletions(-) diff --git a/acacore/database/base.py b/acacore/database/base.py index 6bee159..d95857a 100644 --- a/acacore/database/base.py +++ b/acacore/database/base.py @@ -1,13 +1,23 @@ from datetime import datetime from os import PathLike from pathlib import Path -from sqlite3 import Connection, OperationalError +from sqlite3 import Connection from sqlite3 import Cursor as SQLiteCursor -from typing import Any, Generator, Generic, Optional, Type, TypeVar, Union, overload +from sqlite3 import OperationalError +from typing import Any +from typing import Generator +from typing import Generic +from typing import Optional +from typing import Type +from typing import TypeVar +from typing import Union +from typing import overload from pydantic.main import BaseModel -from .column import Column, SelectColumn, model_to_columns +from .column import Column +from .column import SelectColumn +from .column import model_to_columns T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/database/column.py b/acacore/database/column.py index 249c452..acc06dd 100644 --- a/acacore/database/column.py +++ b/acacore/database/column.py @@ -1,6 +1,11 @@ from datetime import datetime from pathlib import Path -from typing import Callable, Generic, Optional, Type, TypeVar, Union +from typing import Callable +from typing import Generic +from typing import Optional +from typing import Type +from typing import TypeVar +from typing import Union from uuid import UUID from pydantic import BaseModel diff --git a/acacore/database/files_db.py b/acacore/database/files_db.py index 2a2742a..f9ec567 100644 --- a/acacore/database/files_db.py +++ b/acacore/database/files_db.py @@ -1,12 +1,16 @@ from datetime import datetime from os import PathLike from sqlite3 import Connection -from typing import Any, Optional, Type, Union +from typing import Any +from typing import Optional +from typing import Type +from typing import Union from uuid import UUID from acacore.utils.functions import or_none - -from .base import Column, FileDBBase, SelectColumn +from .base import Column +from .base import FileDBBase +from .base import SelectColumn class FileDB(FileDBBase): diff --git a/acacore/models/file.py b/acacore/models/file.py index a82d9ae..d1af288 100644 --- a/acacore/models/file.py +++ b/acacore/models/file.py @@ -4,13 +4,14 @@ from pathlib import Path from typing import Optional -from pydantic import UUID4, Field +from pydantic import Field +from pydantic import UUID4 from acacore.utils.io import size_fmt - from .base import ACABase from .identification import Identification + # ----------------------------------------------------------------------------- # Model # ----------------------------------------------------------------------------- diff --git a/acacore/models/file_data.py b/acacore/models/file_data.py index bfe76c3..6f918d8 100644 --- a/acacore/models/file_data.py +++ b/acacore/models/file_data.py @@ -1,10 +1,11 @@ from pathlib import Path -from typing import Any, ClassVar, Optional +from typing import Any +from typing import ClassVar +from typing import Optional from pydantic import model_validator from acacore.database.files_db import FileDB - from .base import ACABase from .file import ArchiveFile diff --git a/acacore/models/history.py b/acacore/models/history.py index e6e458d..a8e7e80 100644 --- a/acacore/models/history.py +++ b/acacore/models/history.py @@ -1,5 +1,6 @@ from datetime import datetime -from typing import Optional, Union +from typing import Optional +from typing import Union from pydantic import UUID4 diff --git a/acacore/models/identification.py b/acacore/models/identification.py index 9effa0b..a5c4e60 100644 --- a/acacore/models/identification.py +++ b/acacore/models/identification.py @@ -1,4 +1,5 @@ -from typing import Any, Optional +from typing import Any +from typing import Optional from pydantic import model_validator diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py index 2d3a34a..48880d4 100644 --- a/acacore/siegfried_utils/identify.py +++ b/acacore/siegfried_utils/identify.py @@ -3,10 +3,13 @@ import subprocess from logging import Logger from pathlib import Path -from typing import Any, Optional, Tuple +from typing import Any +from typing import Optional +from typing import Tuple from acacore.models.identification import Identification -from acacore.reference_files.ref_files import costum_sigs, to_re_identify +from acacore.reference_files.ref_files import costum_sigs +from acacore.reference_files.ref_files import to_re_identify def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: diff --git a/acacore/utils/functions.py b/acacore/utils/functions.py index e72e482..6aa828e 100644 --- a/acacore/utils/functions.py +++ b/acacore/utils/functions.py @@ -1,4 +1,6 @@ -from typing import Callable, Optional, TypeVar +from typing import Callable +from typing import Optional +from typing import TypeVar T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/utils/log.py b/acacore/utils/log.py index e7a0f56..1f28c8f 100644 --- a/acacore/utils/log.py +++ b/acacore/utils/log.py @@ -1,4 +1,8 @@ -from logging import INFO, FileHandler, Formatter, Logger, getLogger +from logging import FileHandler +from logging import Formatter +from logging import INFO +from logging import Logger +from logging import getLogger from pathlib import Path From 27fd8b71cb51a17c6e066975ad1367fb0bddaeeb Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 17 Oct 2023 09:29:06 +0200 Subject: [PATCH 6/7] start of review --- acacore/siegfried_utils/__init__.py | 2 -- acacore/siegfried_utils/identify.py | 30 ++--------------------------- 2 files changed, 2 insertions(+), 30 deletions(-) diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py index d9e40a4..e69de29 100644 --- a/acacore/siegfried_utils/__init__.py +++ b/acacore/siegfried_utils/__init__.py @@ -1,2 +0,0 @@ -"""Place for functions and classes that act as an entrance to siegfried""" -from . import identify diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py index 48880d4..46b08e9 100644 --- a/acacore/siegfried_utils/identify.py +++ b/acacore/siegfried_utils/identify.py @@ -3,13 +3,10 @@ import subprocess from logging import Logger from pathlib import Path -from typing import Any -from typing import Optional -from typing import Tuple +from typing import Any, Optional, Tuple from acacore.models.identification import Identification -from acacore.reference_files.ref_files import costum_sigs -from acacore.reference_files.ref_files import to_re_identify +from acacore.reference_files.ref_files import costum_sigs, to_re_identify def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: @@ -103,29 +100,6 @@ def get_aca_signature(bof: str, eof: str) -> Optional[dict]: return None -def sf_id_puid(path: Path) -> Optional[str]: - """Identify a file and return only its PUID using siegfried. - - Args: - path (`Path`): Path to the file to identify. - - Returns: - Optional[str]: The PUID of the file, or `None` if none is found - """ - id_result = run_sf_and_get_results_json(path) - - puid: Optional[str] = None - for file_result in id_result.get("files", []): - match: dict = {} - for id_match in file_result.get("matches"): - if id_match.get("ns") == "pronom": - match = id_match - if match: - puid = None if match.get("id", "").lower() == "unknown" else match.get("id") - break - return puid - - def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identification]: """Identify multiple files using `siegfried`, and return a dictionary mapping the files path to a Identification datastructure containing the info obtained. From 874fed79453b5c43a9eb7cbe01bf289999a3e3fc Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 17 Oct 2023 11:23:29 +0200 Subject: [PATCH 7/7] refactored my diegfried implementation to be compatible with the one on main: Now only the methods that handled aca identification remain and they have been moved to the file class. * Also added datamodels to check the data we get from reference files --- acacore/database/base.py | 16 +- acacore/database/column.py | 7 +- acacore/database/files_db.py | 9 +- acacore/models/__init__.py | 1 + acacore/models/file.py | 76 ++++++++- acacore/models/file_data.py | 5 +- acacore/models/history.py | 3 +- acacore/models/identification.py | 3 +- acacore/models/reference_files.py | 24 +++ acacore/reference_files/ref_files.py | 25 ++- acacore/siegfried/siegfried.py | 35 ++--- acacore/siegfried_utils/__init__.py | 0 acacore/siegfried_utils/identify.py | 227 --------------------------- acacore/utils/functions.py | 4 +- acacore/utils/log.py | 7 +- 15 files changed, 145 insertions(+), 297 deletions(-) create mode 100644 acacore/models/reference_files.py delete mode 100644 acacore/siegfried_utils/__init__.py delete mode 100644 acacore/siegfried_utils/identify.py diff --git a/acacore/database/base.py b/acacore/database/base.py index d95857a..6bee159 100644 --- a/acacore/database/base.py +++ b/acacore/database/base.py @@ -1,23 +1,13 @@ from datetime import datetime from os import PathLike from pathlib import Path -from sqlite3 import Connection +from sqlite3 import Connection, OperationalError from sqlite3 import Cursor as SQLiteCursor -from sqlite3 import OperationalError -from typing import Any -from typing import Generator -from typing import Generic -from typing import Optional -from typing import Type -from typing import TypeVar -from typing import Union -from typing import overload +from typing import Any, Generator, Generic, Optional, Type, TypeVar, Union, overload from pydantic.main import BaseModel -from .column import Column -from .column import SelectColumn -from .column import model_to_columns +from .column import Column, SelectColumn, model_to_columns T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/database/column.py b/acacore/database/column.py index acc06dd..249c452 100644 --- a/acacore/database/column.py +++ b/acacore/database/column.py @@ -1,11 +1,6 @@ from datetime import datetime from pathlib import Path -from typing import Callable -from typing import Generic -from typing import Optional -from typing import Type -from typing import TypeVar -from typing import Union +from typing import Callable, Generic, Optional, Type, TypeVar, Union from uuid import UUID from pydantic import BaseModel diff --git a/acacore/database/files_db.py b/acacore/database/files_db.py index 8cb9024..d36d21a 100644 --- a/acacore/database/files_db.py +++ b/acacore/database/files_db.py @@ -1,15 +1,12 @@ from datetime import datetime from os import PathLike from sqlite3 import Connection -from typing import Optional -from typing import Type -from typing import Union +from typing import Optional, Type, Union from uuid import UUID from acacore.utils.functions import or_none -from .base import Column -from .base import FileDBBase -from .base import SelectColumn + +from .base import Column, FileDBBase, SelectColumn class FileDB(FileDBBase): diff --git a/acacore/models/__init__.py b/acacore/models/__init__.py index 7b1839e..eea3b30 100644 --- a/acacore/models/__init__.py +++ b/acacore/models/__init__.py @@ -3,3 +3,4 @@ from . import file_data from . import identification from . import metadata +from . import reference_files diff --git a/acacore/models/file.py b/acacore/models/file.py index d1af288..f81a47c 100644 --- a/acacore/models/file.py +++ b/acacore/models/file.py @@ -1,17 +1,19 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +import re from pathlib import Path -from typing import Optional +from typing import Optional, Tuple -from pydantic import Field -from pydantic import UUID4 +from pydantic import UUID4, Field +from acacore.models.reference_files import CustomSignature +from acacore.siegfried.siegfried import Siegfried, SiegfriedFile from acacore.utils.io import size_fmt + from .base import ACABase from .identification import Identification - # ----------------------------------------------------------------------------- # Model # ----------------------------------------------------------------------------- @@ -31,6 +33,50 @@ class File(ACABase): warning: Optional[str] = None action: Optional[str] = None + def identify(self, sf: Siegfried) -> SiegfriedFile: + """Identify the file using `siegfried`. + + Args: + sf (Siegfried): A Siegfried class object + + Returns: + SiegfriedFile: A dataclass object containing the results from the identification + """ + return sf.identify(self.get_absolute_path())[0] + + def re_identify_with_aca(self, costum_sigs: list[CustomSignature]) -> None: + """Uses the BOF and EOF to try to determine a ACAUID for the file. + + The costum_sigs list should be found on the `reference_files` repo. + If no match can be found, the method does nothing. + + Args: + costum_sigs: A list of the costum_signatures that the file should be checked against + """ + bof, eof = self.get_bof_and_eof() + # We have to go through all of the signatures in order to check their BOF en EOF with the file. + for sig in costum_sigs: + if sig.bof and sig.eof: + bof_pattern = re.compile(sig.bof) + eof_pattern = re.compile(sig.eof) + if sig.operator == "OR": + if bof_pattern.search(bof) or eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.operator == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.bof: + bof_pattern = re.compile(sig.bof) + if bof_pattern.search(bof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.eof: + eof_pattern = re.compile(sig.eof) + if eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + def get_absolute_path(self, root: Optional[Path] = None) -> Path: return root.joinpath(self.relative_path) if root else self.relative_path.resolve() @@ -96,6 +142,28 @@ def size_fmt(self) -> str: """ return str(size_fmt(self.get_absolute_path().stat().st_size)) + def get_bof_and_eof(self) -> Tuple[str, str]: + """Get the first and last kilobyte of the file. + + Args: + file (Path): Path to file + + Returns: + Tuple[str,str]: BOF and then EOF as `str`. + """ + file = self.get_absolute_path() + with file.open("rb") as file_bytes: + # BOF + bof = file_bytes.read(1024).hex() + # Navigate to EOF + try: + file_bytes.seek(-1024, 2) + except OSError: + # File too small :) + file_bytes.seek(-file_bytes.tell(), 2) + eof = file_bytes.read(1024).hex() + return (bof, eof) + class ArchiveFile(Identification, File): """ArchiveFile data model.""" diff --git a/acacore/models/file_data.py b/acacore/models/file_data.py index 6f918d8..bfe76c3 100644 --- a/acacore/models/file_data.py +++ b/acacore/models/file_data.py @@ -1,11 +1,10 @@ from pathlib import Path -from typing import Any -from typing import ClassVar -from typing import Optional +from typing import Any, ClassVar, Optional from pydantic import model_validator from acacore.database.files_db import FileDB + from .base import ACABase from .file import ArchiveFile diff --git a/acacore/models/history.py b/acacore/models/history.py index a8e7e80..e6e458d 100644 --- a/acacore/models/history.py +++ b/acacore/models/history.py @@ -1,6 +1,5 @@ from datetime import datetime -from typing import Optional -from typing import Union +from typing import Optional, Union from pydantic import UUID4 diff --git a/acacore/models/identification.py b/acacore/models/identification.py index a5c4e60..9effa0b 100644 --- a/acacore/models/identification.py +++ b/acacore/models/identification.py @@ -1,5 +1,4 @@ -from typing import Any -from typing import Optional +from typing import Any, Optional from pydantic import model_validator diff --git a/acacore/models/reference_files.py b/acacore/models/reference_files.py new file mode 100644 index 0000000..9681653 --- /dev/null +++ b/acacore/models/reference_files.py @@ -0,0 +1,24 @@ +"""Data models for the data on saved to different .json files on the `reference_files` repo.""" +from typing import Optional + +from pydantic import BaseModel + + +class ReIdentifyModel(BaseModel): + """Data model for the `to_reidentify` from reference_files.""" + + puid: Optional[str] = None + name: Optional[str] = None + ext: Optional[str] = None + reasoning: Optional[str] = None + + +class CustomSignature(BaseModel): + """Data model for the `costum_signatures` from reference_files.""" + + bof: Optional[str] = None + eof: Optional[str] = None + operator: Optional[str] = None + puid: Optional[str] = None + signature: Optional[str] = None + extension: Optional[str] = None diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py index 0098d59..6fbcbcc 100644 --- a/acacore/reference_files/ref_files.py +++ b/acacore/reference_files/ref_files.py @@ -3,9 +3,11 @@ from http.client import HTTPResponse from urllib import request +from models.reference_files import CustomSignature, ReIdentifyModel + @lru_cache -def to_re_identify() -> dict[str, str]: +def to_re_identify() -> list[ReIdentifyModel]: """Gets the json file with the different formats that we wish to reidentify. Is kept updated on the reference-files repo. The function caches the result, @@ -22,11 +24,16 @@ def to_re_identify() -> dict[str, str]: if re_identify_map is None: raise ConnectionError - return re_identify_map + result_list: list[ReIdentifyModel] = [] + for key, values in re_identify_map.items(): + result = ReIdentifyModel(puid=key, **values) + result_list.append(result) + + return result_list @lru_cache -def costum_sigs() -> list[dict]: +def costum_sigs() -> list[CustomSignature]: """Gets the json file with our own costum formats in a list. Is kept updated on the reference-files repo. The function caches the result, @@ -38,9 +45,15 @@ def costum_sigs() -> list[dict]: if response.getcode() != 200: raise ConnectionError - re_identify_map: dict[str, str] = json.loads(response.read()) + custom_list: list[dict] = json.loads(response.read()) - if re_identify_map is None: + if custom_list is None: raise ConnectionError - return re_identify_map + result_list: list[CustomSignature] = [] + + for values in custom_list: + result = CustomSignature(**values) + result_list.append(result) + + return result_list diff --git a/acacore/siegfried/siegfried.py b/acacore/siegfried/siegfried.py index 6edd25b..f46a688 100644 --- a/acacore/siegfried/siegfried.py +++ b/acacore/siegfried/siegfried.py @@ -1,15 +1,10 @@ from datetime import datetime from os import PathLike from pathlib import Path -from subprocess import CompletedProcess -from subprocess import run -from typing import Optional -from typing import Union +from subprocess import CompletedProcess, run +from typing import Optional, Union -from pydantic import BaseModel -from pydantic import ConfigDict -from pydantic import Field -from pydantic import field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator from acacore.exceptions.files import IdentificationError @@ -17,11 +12,11 @@ def _check_process(process: CompletedProcess): """ Raises: - IdentificationError: if the process ends with a return code other than 0 - """ + IdentificationError: if the process ends with a return code other than 0. + """ # noqa: D205 if process.returncode != 0: raise IdentificationError( - process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}" + process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}", ) @@ -32,8 +27,8 @@ class SiegfriedIdentifier(BaseModel): class SiegfriedMatch(BaseModel): ns: str - id: Optional[str] - format: str + id: Optional[str] # noqa: A003 + format: str # noqa: A003 version: str mime: str match_class: str = Field(alias="class") @@ -74,16 +69,16 @@ class Siegfried: https://github.com/richardlehane/siegfried """ - def __init__(self, binary: Union[str, PathLike] = "sf"): + def __init__(self, binary: Union[str, PathLike] = "sf") -> None: """ Args: - binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable + binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable. Raises: - IdentificationError: If Siegfried is not configured properly - """ + IdentificationError: If Siegfried is not configured properly. + """ # noqa: D205 self.binary: str = str(binary) - _check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8")) + _check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8")) # noqa: PLW1510 def identify(self, path: Union[str, PathLike]) -> SiegfriedResult: """ @@ -102,6 +97,7 @@ def identify(self, path: Union[str, PathLike]) -> SiegfriedResult: [self.binary, "-json", "-multi", "1024", str(path)], capture_output=True, encoding="utf-8", + check=False, ) _check_process(process) try: @@ -117,7 +113,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]: paths: The paths to the files Returns: - A tuple of tuples joining the paths with their SiegfriedFile result + tuple[tuple[Path, SiegfriedFile]: A tuple of tuples joining the paths with their SiegfriedFile result Raises: IdentificationError: If there is an error calling Siegfried or processing its results @@ -126,6 +122,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]: [self.binary, "-json", "-multi", "1024", *map(str, paths)], capture_output=True, encoding="utf-8", + check=False, ) _check_process(process) try: diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py deleted file mode 100644 index 46b08e9..0000000 --- a/acacore/siegfried_utils/identify.py +++ /dev/null @@ -1,227 +0,0 @@ -import json -import re -import subprocess -from logging import Logger -from pathlib import Path -from typing import Any, Optional, Tuple - -from acacore.models.identification import Identification -from acacore.reference_files.ref_files import costum_sigs, to_re_identify - - -def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: - """Uses the BOF and EOF to try to determine a ACAUID for a file and update its Identification datastructure. - - If none can be found, simply return the same Identification data structure as it got in the beginning. - - Args: - path (Path): PAth to the file - file_id (Identification): The file identification data structure that should be updated with the new values - - Returns: - Identification: The updated file data structure. - """ - bof, eof = get_bof_and_eof(path) - - sig_for_file = get_aca_signature(bof, eof) - if sig_for_file: - update_file_id(path, file_id, sig_for_file) - return file_id - - -def aca_id(path: Path) -> Optional[str]: - """Tries to find one of our costum ACAUID's for a file, based on its BOF and EOF bytes. Returns `None` if none is found. - - Args: - path (Path): Path to the file to be examined - - Returns: - Optional[str]: Possible ACAUID - """ - bof, eof = get_bof_and_eof(path) - - sig_for_file = get_aca_signature(bof, eof) - if not sig_for_file: - return None - - return sig_for_file.get("puid", None) - - -def get_bof_and_eof(file: Path) -> Tuple[str, str]: - """Get the first and last kilobyte of a file. - - Args: - file (Path): Path to file - - Returns: - Tuple[str,str]: BOF and then EOF as `str`. - """ - with file.open("rb") as file_bytes: - # BOF - bof = file_bytes.read(1024).hex() - # Navigate to EOF - try: - file_bytes.seek(-1024, 2) - except OSError: - # File too small :) - file_bytes.seek(-file_bytes.tell(), 2) - eof = file_bytes.read(1024).hex() - return (bof, eof) - - -def get_aca_signature(bof: str, eof: str) -> Optional[dict]: - """Get the ACA signature of a file type, if one exists. Else return `None`. - - Args: - bof (str): The first kilobyte of a file - eof (str): The last kilobyte of a file - - Returns: - Optional(str): The signature, if one was found. - """ - aca_signatures: list[dict] = costum_sigs() - for sig in aca_signatures: - if "bof" in sig and "eof" in sig: - bof_pattern = re.compile(sig["bof"]) - eof_pattern = re.compile(sig["eof"]) - if sig["operator"] == "OR": - if bof_pattern.search(bof) or eof_pattern.search(eof): - return sig - elif sig["operator"] == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): - return sig - elif "bof" in sig: - bof_pattern = re.compile(sig["bof"]) - if bof_pattern.search(bof): - return sig - elif "eof" in sig: - eof_pattern = re.compile(sig["eof"]) - if eof_pattern.search(eof): - return sig - return None - - -def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identification]: - """Identify multiple files using `siegfried`, and return a dictionary mapping the files path to a Identification datastructure containing the info obtained. - - Also updates FileInfo with obtained PUID, signature name, and warning if applicable. - - Parameters - ---------- - path : pathlib.Path - Path in which to identify files. - - Returns: - ------- - Dict[Path, Identification] - Dictionary containing file path and associated identification - information obtained from siegfried's stdout. - - """ - id_dict: dict[Path, Identification] = {} - - id_result = run_sf_and_get_results_json(path) - - # We get identifiers as a list containing the ditionary, - # soo we have to get the one element our of it - results_dict: Optional[dict] = id_result.get("identifiers", None)[0] - if results_dict and log: - DROID_file_version: Optional[str] = results_dict.get("details") - log.info( - "Running sf with the following version of DROID: " + DROID_file_version if DROID_file_version else "", - ) - for file_result in id_result.get("files", []): - match: dict[str, Any] = {} - for id_match in file_result.get("matches"): - if id_match.get("ns") == "pronom": - match = id_match - if match: - file_identification: Identification - file_path: Path = Path(file_result["filename"]) - - puid = None if match.get("id", "").lower() == "unknown" else match.get("id") - - signature_and_version = None - signature = match.get("format") - version = match.get("version") - if signature: - signature_and_version = f"{signature} ({version})" - warning: str = match.get("warning", "").capitalize() - file_size: int = file_result.get("filesize") - file_errors: Optional[str] = file_result.get("errors", None) - if file_errors: - warning = warning + " ; Errors: " + file_errors - file_identification = Identification( - puid=puid, - signature=signature_and_version or None, - warning=warning or None, - size=file_size, - ) - - # unindentified files - if puid is None: - file_identification = aca_id_for_file_id(file_path, file_identification) - - # re-identify files, warnings or not! - if puid in to_re_identify(): - file_identification = aca_id_for_file_id(file_path, file_identification) - - # Possible MS Office files identified as markup (XML, HTML etc.) - if puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] and "Extension mismatch" in warning: - file_identification = aca_id_for_file_id(file_path, file_identification) - - id_dict.update({file_path: file_identification}) - - return id_dict - - -# --- -# Aux. methods, used as helper methods for the rest of the methods. -# --- - - -def run_sf_and_get_results_json(path: Path) -> dict: - """Run `siegfried` on `path`, and return the result as a dictionary build from the .json output of `sf`. - - Args: - path (Path): A path to a folder containg files or subfolder with files (or more subfolders! ) - - Raises: - OSError: If there is an error with running siegfried or loading the results as a .json file, raises OSError - - Returns: - dict: dictionary created from .json output of siegfried - """ - try: - sf_proc = subprocess.run( - ["sf", "-json", "-multi", "1024", str(path)], - check=True, - capture_output=True, - ) - except Exception as error: - raise OSError(error) - - try: - id_result: dict = json.loads(sf_proc.stdout) - except Exception as error: - raise OSError(error) - - return id_result - - -def update_file_id(path: Path, file_id: Identification, signature: dict[str, str]) -> None: - """Update a file Identification data model with an PUID and signature given as a dictionary. - - It is primarily used by the `costum_id` method. - Also adds a 'Extension Mismatch' warning if the extension of the file is not as we excpect from the given dict. - - Args: - path (Path): Path to the file - file_id (Identification): File identification data model - signature (dict[str, str]): Dictionary with new values for PUID and signature. - """ - file_id.puid = signature["puid"] - file_id.signature = signature["signature"] - if path.suffix.lower() != signature["extension"].lower(): - file_id.warning = "Extension mismatch" - else: - file_id.warning = None diff --git a/acacore/utils/functions.py b/acacore/utils/functions.py index 6aa828e..e72e482 100644 --- a/acacore/utils/functions.py +++ b/acacore/utils/functions.py @@ -1,6 +1,4 @@ -from typing import Callable -from typing import Optional -from typing import TypeVar +from typing import Callable, Optional, TypeVar T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/utils/log.py b/acacore/utils/log.py index 4569a8b..8b249b8 100644 --- a/acacore/utils/log.py +++ b/acacore/utils/log.py @@ -1,8 +1,4 @@ -from logging import FileHandler -from logging import Formatter -from logging import INFO -from logging import Logger -from logging import getLogger +from logging import INFO, FileHandler, Formatter, Logger, getLogger from pathlib import Path @@ -20,7 +16,6 @@ def setup_logger(log_name: str, log_path: Path) -> Logger: Returns: A Logger instance. """ - if not log_path.parent.exists(): Path.mkdir(log_path.parent, parents=True, exist_ok=True)