From 1e2f9b5f4e99a788b79994de36c0b06509392ee8 Mon Sep 17 00:00:00 2001 From: Magnus Lindholm Date: Tue, 10 Oct 2023 14:43:06 +0200 Subject: [PATCH] added functionality to run and use siegfried * Also added functions to get info from reference files * Updated the docstring for several of the new functions * Decomposed the new functions into multiple helper functions to avoid duplicate code --- acacore/reference_files/__init__.py | 2 + acacore/reference_files/ref_files.py | 44 +++++ acacore/siegfried_utils/__init__.py | 2 + acacore/siegfried_utils/identify.py | 249 +++++++++++++++++++++++++++ 4 files changed, 297 insertions(+) create mode 100644 acacore/reference_files/__init__.py create mode 100644 acacore/reference_files/ref_files.py create mode 100644 acacore/siegfried_utils/__init__.py create mode 100644 acacore/siegfried_utils/identify.py diff --git a/acacore/reference_files/__init__.py b/acacore/reference_files/__init__.py new file mode 100644 index 0000000..c603535 --- /dev/null +++ b/acacore/reference_files/__init__.py @@ -0,0 +1,2 @@ +"""Collection of methods that allows us to """ +from . import ref_files \ No newline at end of file diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py new file mode 100644 index 0000000..50456bd --- /dev/null +++ b/acacore/reference_files/ref_files.py @@ -0,0 +1,44 @@ +import json +from functools import lru_cache +from http.client import HTTPResponse +from urllib import request + + +@lru_cache +def to_re_identify() -> dict[str, str]: + """Gets the json file with the different formats that we wish to reidentify. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen("https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json") + if response.getcode() != 200: + raise ConnectionError + + re_identify_map: dict[str, str] = json.loads(response.read()) + + if re_identify_map is None: + raise ConnectionError + + return re_identify_map + + +@lru_cache +def costum_sigs() -> list[dict]: + """Gets the json file with our own costum formats in a list. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen( + "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/custom_signatures.json", + ) + if response.getcode() != 200: + raise ConnectionError + + re_identify_map: dict[str, str] = json.loads(response.read()) + + if re_identify_map is None: + raise ConnectionError + + return re_identify_map diff --git a/acacore/siegfried_utils/__init__.py b/acacore/siegfried_utils/__init__.py new file mode 100644 index 0000000..b0f30ed --- /dev/null +++ b/acacore/siegfried_utils/__init__.py @@ -0,0 +1,2 @@ +"""Place for functions and classes that act as an entrance to siegfried""" +from . import identify \ No newline at end of file diff --git a/acacore/siegfried_utils/identify.py b/acacore/siegfried_utils/identify.py new file mode 100644 index 0000000..6425bad --- /dev/null +++ b/acacore/siegfried_utils/identify.py @@ -0,0 +1,249 @@ +import json +import re +import subprocess +from logging import Logger +from pathlib import Path +from typing import Any, Optional, Tuple + +from acacore.models.identification import Identification +from acacore.reference_files.ref_files import costum_sigs, to_re_identify + + +def aca_id_for_file_id(path: Path, file_id: Identification) -> Identification: + """Uses the BOF and EOF to try to determine a ACAUID for a file and update its Identification datastructure. + + If none can be found, simply return the same Identification data structure as it got in the beginning. + + Args: + path (Path): PAth to the file + file_id (Identification): The file identification data structure that should be updated with the new values + + Returns: + Identification: The updated file data structure. + """ + bof, eof = get_bof_and_eof(path) + + sig_for_file = get_aca_signature(bof, eof) + if sig_for_file: + update_file_id(path, file_id, sig_for_file) + return file_id + +def aca_id(path: Path) -> Optional[str]: + """Tries to find one of our costum ACAUID's for a file, based on its BOF and EOF bytes. Returns `None` if none is found. + + Args: + path (Path): Path to the file to be examined + + Returns: + Optional[str]: Possible ACAUID + """ + bof, eof = get_bof_and_eof(path) + + sig_for_file = get_aca_signature(bof, eof) + if not sig_for_file: + return None + + return sig_for_file.get("puid", None) + +def get_bof_and_eof(file: Path) -> Tuple[str,str]: + """Get the first and last kilobyte of a file. + + Args: + file (Path): Path to file + + Returns: + Tuple[str,str]: BOF and then EOF as `str`. + """ + with file.open("rb") as file_bytes: + # BOF + bof = file_bytes.read(1024).hex() + # Navigate to EOF + try: + file_bytes.seek(-1024, 2) + except OSError: + # File too small :) + file_bytes.seek(-file_bytes.tell(), 2) + eof = file_bytes.read(1024).hex() + return (bof, eof) + +def get_aca_signature(bof: str, eof: str) -> Optional[dict]: + """Get the ACA signature of a file type, if one exists. Else return `None`. + + Args: + bof (str): The first kilobyte of a file + eof (str): The last kilobyte of a file + + Returns: + Optional(str): The signature, if one was found. + """ + aca_signatures: list[dict] = costum_sigs() + for sig in aca_signatures: + if "bof" in sig and "eof" in sig: + bof_pattern = re.compile(sig["bof"]) + eof_pattern = re.compile(sig["eof"]) + if sig["operator"] == "OR": + if bof_pattern.search(bof) or eof_pattern.search(eof): + return sig + elif sig["operator"] == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): + return sig + elif "bof" in sig: + bof_pattern = re.compile(sig["bof"]) + if bof_pattern.search(bof): + return sig + elif "eof" in sig: + eof_pattern = re.compile(sig["eof"]) + if eof_pattern.search(eof): + return sig + return None + +def sf_id_puid(path: Path) -> Optional[str]: + """Identify a file and return only its PUID using siegfried. + + Args: + path (`Path`): Path to the file to identify. + + Returns: + Optional[str]: The PUID of the file, or `None` if none is found + """ + id_result = run_sf_and_get_results_json(path) + + puid: Optional[str] = None + for file_result in id_result.get("files", []): + match: dict = {} + for id_match in file_result.get("matches"): + if id_match.get("ns") == "pronom": + match = id_match + if match: + puid = None if match.get("id", "").lower() == "unknown" else match.get("id") + break + return puid + + +def sf_id_full(path: Path, log: Optional[Logger] = None) -> dict[Path, Identification]: + """Identify multiple files using `siegfried`, and return a dictionary mapping the files path to a Identification datastructure containing the info obtained. + + Also updates FileInfo with obtained PUID, signature name, and warning if applicable. + + Parameters + ---------- + path : pathlib.Path + Path in which to identify files. + + Returns: + ------- + Dict[Path, Identification] + Dictionary containing file path and associated identification + information obtained from siegfried's stdout. + + """ + id_dict: dict[Path, Identification] = {} + + id_result = run_sf_and_get_results_json(path) + + + # We get identifiers as a list containing the ditionary, + # soo we have to get the one element our of it + results_dict: Optional[dict] = id_result.get("identifiers", None)[0] + if results_dict and log: + DROID_file_version: Optional[str] = results_dict.get("details") + log.info( + "Running sf with the following version of DROID: " + DROID_file_version + if DROID_file_version + else "", + ) + for file_result in id_result.get("files", []): + match: dict[str, Any] = {} + for id_match in file_result.get("matches"): + if id_match.get("ns") == "pronom": + match = id_match + if match: + file_identification: Identification + file_path: Path = Path(file_result["filename"]) + + puid = None if match.get("id", "").lower() == "unknown" else match.get("id") + + signature_and_version = None + signature = match.get("format") + version = match.get("version") + if signature: + signature_and_version = f"{signature} ({version})" + warning: str = match.get("warning", "").capitalize() + file_size: int = file_result.get("filesize") + file_errors: Optional[str] = file_result.get("errors", None) + if file_errors: + warning = warning + " ; Errors: " + file_errors + file_identification = Identification( + puid=puid, + signature=signature_and_version or None, + warning=warning or None, + size=file_size, + ) + + # unindentified files + if puid is None: + file_identification = aca_id_for_file_id(file_path, file_identification) + + # re-identify files, warnings or not! + if puid in to_re_identify(): + file_identification = aca_id_for_file_id(file_path, file_identification) + + # Possible MS Office files identified as markup (XML, HTML etc.) + if ( + puid in ["fmt/96", "fmt/101", "fmt/583", "x-fmt/263"] + and "Extension mismatch" in warning + ): + file_identification = aca_id_for_file_id(file_path, file_identification) + + id_dict.update({file_path: file_identification}) + + return id_dict + +# --- +# Aux. methods, used as helper methods for the rest of the methods. +# --- + +def run_sf_and_get_results_json(path: Path) -> dict: + """Run `siegfried` on `path`, and return the result as a dictionary build from the .json output of `sf`. + + Args: + path (Path): A path to a folder containg files or subfolder with files (or more subfolders! ) + + Raises: + OSError: If there is an error with running siegfried or loading the results as a .json file, raises OSError + + Returns: + dict: dictionary created from .json output of siegfried + """ + try: + sf_proc = subprocess.run( + ["sf", "-json", "-multi", "1024", str(path)], + check=True, + capture_output=True, + ) + except Exception as error: + raise OSError(error) + + try: + id_result: dict = json.loads(sf_proc.stdout) + except Exception as error: + raise OSError(error) + + return id_result + +def update_file_id(path: Path, file_id: Identification, signature: dict[str, str]) -> None: + """Update a file Identification data model with an PUID and signature given as a dictionary. + + It is primarily used by the `costum_id` method. + Also adds a 'Extension Mismatch' warning if the extension of the file is not as we excpect from the given dict. + + Args: + path (Path): Path to the file + file_id (Identification): File identification data model + signature (dict[str, str]): Dictionary with new values for PUID and signature. + """ + file_id.puid = signature["puid"] + file_id.signature = signature["signature"] + if path.suffix.lower() != signature["extension"].lower(): + file_id.warning = "Extension mismatch" + else: + file_id.warning = None