Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added functionality to run and use siegfried #12

Merged
merged 12 commits into from
Oct 17, 2023
16 changes: 3 additions & 13 deletions acacore/database/base.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
from sqlite3 import Connection
from sqlite3 import Connection, OperationalError
from sqlite3 import Cursor as SQLiteCursor
from sqlite3 import OperationalError
from typing import Any
from typing import Generator
from typing import Generic
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from typing import overload
from typing import Any, Generator, Generic, Optional, Type, TypeVar, Union, overload

from pydantic.main import BaseModel

from .column import Column
from .column import SelectColumn
from .column import model_to_columns
from .column import Column, SelectColumn, model_to_columns

T = TypeVar("T")
R = TypeVar("R")
Expand Down
7 changes: 1 addition & 6 deletions acacore/database/column.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from datetime import datetime
from pathlib import Path
from typing import Callable
from typing import Generic
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from typing import Callable, Generic, Optional, Type, TypeVar, Union
from uuid import UUID

from pydantic import BaseModel
Expand Down
16 changes: 6 additions & 10 deletions acacore/database/files_db.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from datetime import datetime
from os import PathLike
from sqlite3 import Connection
from typing import Optional
from typing import Type
from typing import Union
from typing import Optional, Type, Union
from uuid import UUID

from acacore.utils.functions import or_none
from .base import Column
from .base import FileDBBase
from .base import SelectColumn

from .base import Column, FileDBBase, SelectColumn


class FileDB(FileDBBase):
Expand Down Expand Up @@ -44,8 +41,7 @@ def __init__(
to avoid parsing overhead.
uri: If set to True, database is interpreted as a URI with a file path and an optional query string.
"""
from acacore.models.file import ConvertedFile
from acacore.models.file import File
from acacore.models.file import ConvertedFile, File
from acacore.models.history import HistoryEntry
from acacore.models.identification import SignatureCount
from acacore.models.metadata import Metadata
Expand Down Expand Up @@ -148,6 +144,6 @@ def add_history(
operation=operation,
data=data,
reason=reason,
time=time or datetime.utcnow(),
)
time=time or datetime.now(), # noqa: DTZ005
),
)
1 change: 1 addition & 0 deletions acacore/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from . import file_data
from . import identification
from . import metadata
from . import reference_files
76 changes: 72 additions & 4 deletions acacore/models/file.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import re
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple

from pydantic import Field
from pydantic import UUID4
from pydantic import UUID4, Field

from acacore.models.reference_files import CustomSignature
from acacore.siegfried.siegfried import Siegfried, SiegfriedFile
from acacore.utils.io import size_fmt

from .base import ACABase
from .identification import Identification


# -----------------------------------------------------------------------------
# Model
# -----------------------------------------------------------------------------
Expand All @@ -31,6 +33,50 @@ class File(ACABase):
warning: Optional[str] = None
action: Optional[str] = None

def identify(self, sf: Siegfried) -> SiegfriedFile:
"""Identify the file using `siegfried`.

Args:
sf (Siegfried): A Siegfried class object

Returns:
SiegfriedFile: A dataclass object containing the results from the identification
"""
return sf.identify(self.get_absolute_path())[0]

def re_identify_with_aca(self, costum_sigs: list[CustomSignature]) -> None:
"""Uses the BOF and EOF to try to determine a ACAUID for the file.

The costum_sigs list should be found on the `reference_files` repo.
If no match can be found, the method does nothing.

Args:
costum_sigs: A list of the costum_signatures that the file should be checked against
"""
bof, eof = self.get_bof_and_eof()
# We have to go through all of the signatures in order to check their BOF en EOF with the file.
for sig in costum_sigs:
if sig.bof and sig.eof:
bof_pattern = re.compile(sig.bof)
eof_pattern = re.compile(sig.eof)
if sig.operator == "OR":
if bof_pattern.search(bof) or eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.operator == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.bof:
bof_pattern = re.compile(sig.bof)
if bof_pattern.search(bof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.eof:
eof_pattern = re.compile(sig.eof)
if eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature

def get_absolute_path(self, root: Optional[Path] = None) -> Path:
return root.joinpath(self.relative_path) if root else self.relative_path.resolve()

Expand Down Expand Up @@ -96,6 +142,28 @@ def size_fmt(self) -> str:
"""
return str(size_fmt(self.get_absolute_path().stat().st_size))

def get_bof_and_eof(self) -> Tuple[str, str]:
"""Get the first and last kilobyte of the file.

Args:
file (Path): Path to file

Returns:
Tuple[str,str]: BOF and then EOF as `str`.
"""
file = self.get_absolute_path()
with file.open("rb") as file_bytes:
# BOF
bof = file_bytes.read(1024).hex()
# Navigate to EOF
try:
file_bytes.seek(-1024, 2)
except OSError:
# File too small :)
file_bytes.seek(-file_bytes.tell(), 2)
eof = file_bytes.read(1024).hex()
return (bof, eof)


class ArchiveFile(Identification, File):
"""ArchiveFile data model."""
Expand Down
5 changes: 2 additions & 3 deletions acacore/models/file_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from pathlib import Path
from typing import Any
from typing import ClassVar
from typing import Optional
from typing import Any, ClassVar, Optional

from pydantic import model_validator

from acacore.database.files_db import FileDB

from .base import ACABase
from .file import ArchiveFile

Expand Down
3 changes: 1 addition & 2 deletions acacore/models/history.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime
from typing import Optional
from typing import Union
from typing import Optional, Union

from pydantic import UUID4

Expand Down
3 changes: 1 addition & 2 deletions acacore/models/identification.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any
from typing import Optional
from typing import Any, Optional

from pydantic import model_validator

Expand Down
24 changes: 24 additions & 0 deletions acacore/models/reference_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Data models for the data on saved to different .json files on the `reference_files` repo."""
from typing import Optional

from pydantic import BaseModel


class ReIdentifyModel(BaseModel):
"""Data model for the `to_reidentify` from reference_files."""

puid: Optional[str] = None
name: Optional[str] = None
ext: Optional[str] = None
reasoning: Optional[str] = None


class CustomSignature(BaseModel):
"""Data model for the `costum_signatures` from reference_files."""

bof: Optional[str] = None
eof: Optional[str] = None
operator: Optional[str] = None
puid: Optional[str] = None
signature: Optional[str] = None
extension: Optional[str] = None
2 changes: 2 additions & 0 deletions acacore/reference_files/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"""Collection of methods that allows us to """
from . import ref_files
59 changes: 59 additions & 0 deletions acacore/reference_files/ref_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
from functools import lru_cache
from http.client import HTTPResponse
from urllib import request

from models.reference_files import CustomSignature, ReIdentifyModel


@lru_cache
def to_re_identify() -> list[ReIdentifyModel]:
"""Gets the json file with the different formats that we wish to reidentify.

Is kept updated on the reference-files repo. The function caches the result,
soo multiple calls in the same run should not be an issue.
"""
response: HTTPResponse = request.urlopen(
"https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json",
)
if response.getcode() != 200:
raise ConnectionError

re_identify_map: dict[str, str] = json.loads(response.read())

if re_identify_map is None:
raise ConnectionError

result_list: list[ReIdentifyModel] = []
for key, values in re_identify_map.items():
result = ReIdentifyModel(puid=key, **values)
result_list.append(result)

return result_list


@lru_cache
def costum_sigs() -> list[CustomSignature]:
"""Gets the json file with our own costum formats in a list.

Is kept updated on the reference-files repo. The function caches the result,
soo multiple calls in the same run should not be an issue.
"""
response: HTTPResponse = request.urlopen(
"https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/custom_signatures.json",
)
if response.getcode() != 200:
raise ConnectionError

custom_list: list[dict] = json.loads(response.read())

if custom_list is None:
raise ConnectionError

result_list: list[CustomSignature] = []

for values in custom_list:
result = CustomSignature(**values)
result_list.append(result)

return result_list
35 changes: 16 additions & 19 deletions acacore/siegfried/siegfried.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run
from typing import Optional
from typing import Union
from subprocess import CompletedProcess, run
from typing import Optional, Union

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator

from acacore.exceptions.files import IdentificationError


def _check_process(process: CompletedProcess):
"""
Raises:
IdentificationError: if the process ends with a return code other than 0
"""
IdentificationError: if the process ends with a return code other than 0.
""" # noqa: D205
if process.returncode != 0:
raise IdentificationError(
process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}"
process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}",
)


Expand All @@ -32,8 +27,8 @@ class SiegfriedIdentifier(BaseModel):

class SiegfriedMatch(BaseModel):
ns: str
id: Optional[str]
format: str
id: Optional[str] # noqa: A003
format: str # noqa: A003
version: str
mime: str
match_class: str = Field(alias="class")
Expand Down Expand Up @@ -74,16 +69,16 @@ class Siegfried:
https://github.com/richardlehane/siegfried
"""

def __init__(self, binary: Union[str, PathLike] = "sf"):
def __init__(self, binary: Union[str, PathLike] = "sf") -> None:
"""
Args:
binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable
binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable.

Raises:
IdentificationError: If Siegfried is not configured properly
"""
IdentificationError: If Siegfried is not configured properly.
""" # noqa: D205
self.binary: str = str(binary)
_check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8"))
_check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8")) # noqa: PLW1510

def identify(self, path: Union[str, PathLike]) -> SiegfriedResult:
"""
Expand All @@ -102,6 +97,7 @@ def identify(self, path: Union[str, PathLike]) -> SiegfriedResult:
[self.binary, "-json", "-multi", "1024", str(path)],
capture_output=True,
encoding="utf-8",
check=False,
)
_check_process(process)
try:
Expand All @@ -117,7 +113,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]:
paths: The paths to the files

Returns:
A tuple of tuples joining the paths with their SiegfriedFile result
tuple[tuple[Path, SiegfriedFile]: A tuple of tuples joining the paths with their SiegfriedFile result

Raises:
IdentificationError: If there is an error calling Siegfried or processing its results
Expand All @@ -126,6 +122,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]:
[self.binary, "-json", "-multi", "1024", *map(str, paths)],
capture_output=True,
encoding="utf-8",
check=False,
)
_check_process(process)
try:
Expand Down
Loading
Loading