diff --git a/acacore/database/base.py b/acacore/database/base.py index d95857a..6bee159 100644 --- a/acacore/database/base.py +++ b/acacore/database/base.py @@ -1,23 +1,13 @@ from datetime import datetime from os import PathLike from pathlib import Path -from sqlite3 import Connection +from sqlite3 import Connection, OperationalError from sqlite3 import Cursor as SQLiteCursor -from sqlite3 import OperationalError -from typing import Any -from typing import Generator -from typing import Generic -from typing import Optional -from typing import Type -from typing import TypeVar -from typing import Union -from typing import overload +from typing import Any, Generator, Generic, Optional, Type, TypeVar, Union, overload from pydantic.main import BaseModel -from .column import Column -from .column import SelectColumn -from .column import model_to_columns +from .column import Column, SelectColumn, model_to_columns T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/database/column.py b/acacore/database/column.py index acc06dd..249c452 100644 --- a/acacore/database/column.py +++ b/acacore/database/column.py @@ -1,11 +1,6 @@ from datetime import datetime from pathlib import Path -from typing import Callable -from typing import Generic -from typing import Optional -from typing import Type -from typing import TypeVar -from typing import Union +from typing import Callable, Generic, Optional, Type, TypeVar, Union from uuid import UUID from pydantic import BaseModel diff --git a/acacore/database/files_db.py b/acacore/database/files_db.py index 999effd..d36d21a 100644 --- a/acacore/database/files_db.py +++ b/acacore/database/files_db.py @@ -1,15 +1,12 @@ from datetime import datetime from os import PathLike from sqlite3 import Connection -from typing import Optional -from typing import Type -from typing import Union +from typing import Optional, Type, Union from uuid import UUID from acacore.utils.functions import or_none -from .base import Column -from .base import FileDBBase -from .base import SelectColumn + +from .base import Column, FileDBBase, SelectColumn class FileDB(FileDBBase): @@ -44,8 +41,7 @@ def __init__( to avoid parsing overhead. uri: If set to True, database is interpreted as a URI with a file path and an optional query string. """ - from acacore.models.file import ConvertedFile - from acacore.models.file import File + from acacore.models.file import ConvertedFile, File from acacore.models.history import HistoryEntry from acacore.models.identification import SignatureCount from acacore.models.metadata import Metadata @@ -148,6 +144,6 @@ def add_history( operation=operation, data=data, reason=reason, - time=time or datetime.utcnow(), - ) + time=time or datetime.now(), # noqa: DTZ005 + ), ) diff --git a/acacore/models/__init__.py b/acacore/models/__init__.py index 7b1839e..eea3b30 100644 --- a/acacore/models/__init__.py +++ b/acacore/models/__init__.py @@ -3,3 +3,4 @@ from . import file_data from . import identification from . import metadata +from . import reference_files diff --git a/acacore/models/file.py b/acacore/models/file.py index d1af288..f81a47c 100644 --- a/acacore/models/file.py +++ b/acacore/models/file.py @@ -1,17 +1,19 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +import re from pathlib import Path -from typing import Optional +from typing import Optional, Tuple -from pydantic import Field -from pydantic import UUID4 +from pydantic import UUID4, Field +from acacore.models.reference_files import CustomSignature +from acacore.siegfried.siegfried import Siegfried, SiegfriedFile from acacore.utils.io import size_fmt + from .base import ACABase from .identification import Identification - # ----------------------------------------------------------------------------- # Model # ----------------------------------------------------------------------------- @@ -31,6 +33,50 @@ class File(ACABase): warning: Optional[str] = None action: Optional[str] = None + def identify(self, sf: Siegfried) -> SiegfriedFile: + """Identify the file using `siegfried`. + + Args: + sf (Siegfried): A Siegfried class object + + Returns: + SiegfriedFile: A dataclass object containing the results from the identification + """ + return sf.identify(self.get_absolute_path())[0] + + def re_identify_with_aca(self, costum_sigs: list[CustomSignature]) -> None: + """Uses the BOF and EOF to try to determine a ACAUID for the file. + + The costum_sigs list should be found on the `reference_files` repo. + If no match can be found, the method does nothing. + + Args: + costum_sigs: A list of the costum_signatures that the file should be checked against + """ + bof, eof = self.get_bof_and_eof() + # We have to go through all of the signatures in order to check their BOF en EOF with the file. + for sig in costum_sigs: + if sig.bof and sig.eof: + bof_pattern = re.compile(sig.bof) + eof_pattern = re.compile(sig.eof) + if sig.operator == "OR": + if bof_pattern.search(bof) or eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.operator == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.bof: + bof_pattern = re.compile(sig.bof) + if bof_pattern.search(bof): + self.puid = sig.puid + self.signature = sig.signature + elif sig.eof: + eof_pattern = re.compile(sig.eof) + if eof_pattern.search(eof): + self.puid = sig.puid + self.signature = sig.signature + def get_absolute_path(self, root: Optional[Path] = None) -> Path: return root.joinpath(self.relative_path) if root else self.relative_path.resolve() @@ -96,6 +142,28 @@ def size_fmt(self) -> str: """ return str(size_fmt(self.get_absolute_path().stat().st_size)) + def get_bof_and_eof(self) -> Tuple[str, str]: + """Get the first and last kilobyte of the file. + + Args: + file (Path): Path to file + + Returns: + Tuple[str,str]: BOF and then EOF as `str`. + """ + file = self.get_absolute_path() + with file.open("rb") as file_bytes: + # BOF + bof = file_bytes.read(1024).hex() + # Navigate to EOF + try: + file_bytes.seek(-1024, 2) + except OSError: + # File too small :) + file_bytes.seek(-file_bytes.tell(), 2) + eof = file_bytes.read(1024).hex() + return (bof, eof) + class ArchiveFile(Identification, File): """ArchiveFile data model.""" diff --git a/acacore/models/file_data.py b/acacore/models/file_data.py index 6f918d8..bfe76c3 100644 --- a/acacore/models/file_data.py +++ b/acacore/models/file_data.py @@ -1,11 +1,10 @@ from pathlib import Path -from typing import Any -from typing import ClassVar -from typing import Optional +from typing import Any, ClassVar, Optional from pydantic import model_validator from acacore.database.files_db import FileDB + from .base import ACABase from .file import ArchiveFile diff --git a/acacore/models/history.py b/acacore/models/history.py index a8e7e80..e6e458d 100644 --- a/acacore/models/history.py +++ b/acacore/models/history.py @@ -1,6 +1,5 @@ from datetime import datetime -from typing import Optional -from typing import Union +from typing import Optional, Union from pydantic import UUID4 diff --git a/acacore/models/identification.py b/acacore/models/identification.py index a5c4e60..9effa0b 100644 --- a/acacore/models/identification.py +++ b/acacore/models/identification.py @@ -1,5 +1,4 @@ -from typing import Any -from typing import Optional +from typing import Any, Optional from pydantic import model_validator diff --git a/acacore/models/reference_files.py b/acacore/models/reference_files.py new file mode 100644 index 0000000..9681653 --- /dev/null +++ b/acacore/models/reference_files.py @@ -0,0 +1,24 @@ +"""Data models for the data on saved to different .json files on the `reference_files` repo.""" +from typing import Optional + +from pydantic import BaseModel + + +class ReIdentifyModel(BaseModel): + """Data model for the `to_reidentify` from reference_files.""" + + puid: Optional[str] = None + name: Optional[str] = None + ext: Optional[str] = None + reasoning: Optional[str] = None + + +class CustomSignature(BaseModel): + """Data model for the `costum_signatures` from reference_files.""" + + bof: Optional[str] = None + eof: Optional[str] = None + operator: Optional[str] = None + puid: Optional[str] = None + signature: Optional[str] = None + extension: Optional[str] = None diff --git a/acacore/reference_files/__init__.py b/acacore/reference_files/__init__.py new file mode 100644 index 0000000..b82916f --- /dev/null +++ b/acacore/reference_files/__init__.py @@ -0,0 +1,2 @@ +"""Collection of methods that allows us to """ +from . import ref_files diff --git a/acacore/reference_files/ref_files.py b/acacore/reference_files/ref_files.py new file mode 100644 index 0000000..6fbcbcc --- /dev/null +++ b/acacore/reference_files/ref_files.py @@ -0,0 +1,59 @@ +import json +from functools import lru_cache +from http.client import HTTPResponse +from urllib import request + +from models.reference_files import CustomSignature, ReIdentifyModel + + +@lru_cache +def to_re_identify() -> list[ReIdentifyModel]: + """Gets the json file with the different formats that we wish to reidentify. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen( + "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json", + ) + if response.getcode() != 200: + raise ConnectionError + + re_identify_map: dict[str, str] = json.loads(response.read()) + + if re_identify_map is None: + raise ConnectionError + + result_list: list[ReIdentifyModel] = [] + for key, values in re_identify_map.items(): + result = ReIdentifyModel(puid=key, **values) + result_list.append(result) + + return result_list + + +@lru_cache +def costum_sigs() -> list[CustomSignature]: + """Gets the json file with our own costum formats in a list. + + Is kept updated on the reference-files repo. The function caches the result, + soo multiple calls in the same run should not be an issue. + """ + response: HTTPResponse = request.urlopen( + "https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/custom_signatures.json", + ) + if response.getcode() != 200: + raise ConnectionError + + custom_list: list[dict] = json.loads(response.read()) + + if custom_list is None: + raise ConnectionError + + result_list: list[CustomSignature] = [] + + for values in custom_list: + result = CustomSignature(**values) + result_list.append(result) + + return result_list diff --git a/acacore/siegfried/siegfried.py b/acacore/siegfried/siegfried.py index 0b5b602..91d8c04 100644 --- a/acacore/siegfried/siegfried.py +++ b/acacore/siegfried/siegfried.py @@ -2,11 +2,9 @@ from os import PathLike from pathlib import Path from re import compile as re_compile -from subprocess import CompletedProcess -from subprocess import run +from subprocess import CompletedProcess, run from typing import Literal -from typing import Optional -from typing import Union +from typing import Optional, Union from pydantic import BaseModel from pydantic import ConfigDict @@ -26,8 +24,8 @@ def _check_process(process: CompletedProcess) -> CompletedProcess: """ Raises: - IdentificationError: if the process ends with a return code other than 0 - """ + IdentificationError: if the process ends with a return code other than 0. + """ # noqa: D205 if process.returncode != 0: raise IdentificationError( process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}" @@ -67,8 +65,8 @@ class SiegfriedMatch(BaseModel): """ ns: str - id: Optional[str] - format: str + id: Optional[str] # noqa: A003 + format: str # noqa: A003 version: Optional[str] = None mime: str match_class: Optional[str] = Field(None, alias="class") diff --git a/acacore/utils/functions.py b/acacore/utils/functions.py index 6aa828e..e72e482 100644 --- a/acacore/utils/functions.py +++ b/acacore/utils/functions.py @@ -1,6 +1,4 @@ -from typing import Callable -from typing import Optional -from typing import TypeVar +from typing import Callable, Optional, TypeVar T = TypeVar("T") R = TypeVar("R") diff --git a/acacore/utils/log.py b/acacore/utils/log.py index 4569a8b..8b249b8 100644 --- a/acacore/utils/log.py +++ b/acacore/utils/log.py @@ -1,8 +1,4 @@ -from logging import FileHandler -from logging import Formatter -from logging import INFO -from logging import Logger -from logging import getLogger +from logging import INFO, FileHandler, Formatter, Logger, getLogger from pathlib import Path @@ -20,7 +16,6 @@ def setup_logger(log_name: str, log_path: Path) -> Logger: Returns: A Logger instance. """ - if not log_path.parent.exists(): Path.mkdir(log_path.parent, parents=True, exist_ok=True)