Skip to content

Commit

Permalink
refactored my diegfried implementation to be compatible with the one …
Browse files Browse the repository at this point in the history
…on main: Now only the methods that handled aca identification remain and they have been moved to the file class. * Also added datamodels to check the data we get from reference files
  • Loading branch information
Magnus Lindholm committed Oct 17, 2023
1 parent 077f9e2 commit 874fed7
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 297 deletions.
16 changes: 3 additions & 13 deletions acacore/database/base.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
from sqlite3 import Connection
from sqlite3 import Connection, OperationalError
from sqlite3 import Cursor as SQLiteCursor
from sqlite3 import OperationalError
from typing import Any
from typing import Generator
from typing import Generic
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from typing import overload
from typing import Any, Generator, Generic, Optional, Type, TypeVar, Union, overload

from pydantic.main import BaseModel

from .column import Column
from .column import SelectColumn
from .column import model_to_columns
from .column import Column, SelectColumn, model_to_columns

T = TypeVar("T")
R = TypeVar("R")
Expand Down
7 changes: 1 addition & 6 deletions acacore/database/column.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from datetime import datetime
from pathlib import Path
from typing import Callable
from typing import Generic
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from typing import Callable, Generic, Optional, Type, TypeVar, Union
from uuid import UUID

from pydantic import BaseModel
Expand Down
9 changes: 3 additions & 6 deletions acacore/database/files_db.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from datetime import datetime
from os import PathLike
from sqlite3 import Connection
from typing import Optional
from typing import Type
from typing import Union
from typing import Optional, Type, Union
from uuid import UUID

from acacore.utils.functions import or_none
from .base import Column
from .base import FileDBBase
from .base import SelectColumn

from .base import Column, FileDBBase, SelectColumn


class FileDB(FileDBBase):
Expand Down
1 change: 1 addition & 0 deletions acacore/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from . import file_data
from . import identification
from . import metadata
from . import reference_files
76 changes: 72 additions & 4 deletions acacore/models/file.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import re
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple

from pydantic import Field
from pydantic import UUID4
from pydantic import UUID4, Field

from acacore.models.reference_files import CustomSignature
from acacore.siegfried.siegfried import Siegfried, SiegfriedFile
from acacore.utils.io import size_fmt

from .base import ACABase
from .identification import Identification


# -----------------------------------------------------------------------------
# Model
# -----------------------------------------------------------------------------
Expand All @@ -31,6 +33,50 @@ class File(ACABase):
warning: Optional[str] = None
action: Optional[str] = None

def identify(self, sf: Siegfried) -> SiegfriedFile:
"""Identify the file using `siegfried`.
Args:
sf (Siegfried): A Siegfried class object
Returns:
SiegfriedFile: A dataclass object containing the results from the identification
"""
return sf.identify(self.get_absolute_path())[0]

def re_identify_with_aca(self, costum_sigs: list[CustomSignature]) -> None:
"""Uses the BOF and EOF to try to determine a ACAUID for the file.
The costum_sigs list should be found on the `reference_files` repo.
If no match can be found, the method does nothing.
Args:
costum_sigs: A list of the costum_signatures that the file should be checked against
"""
bof, eof = self.get_bof_and_eof()
# We have to go through all of the signatures in order to check their BOF en EOF with the file.
for sig in costum_sigs:
if sig.bof and sig.eof:
bof_pattern = re.compile(sig.bof)
eof_pattern = re.compile(sig.eof)
if sig.operator == "OR":
if bof_pattern.search(bof) or eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.operator == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.bof:
bof_pattern = re.compile(sig.bof)
if bof_pattern.search(bof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.eof:
eof_pattern = re.compile(sig.eof)
if eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature

def get_absolute_path(self, root: Optional[Path] = None) -> Path:
return root.joinpath(self.relative_path) if root else self.relative_path.resolve()

Expand Down Expand Up @@ -96,6 +142,28 @@ def size_fmt(self) -> str:
"""
return str(size_fmt(self.get_absolute_path().stat().st_size))

def get_bof_and_eof(self) -> Tuple[str, str]:
"""Get the first and last kilobyte of the file.
Args:
file (Path): Path to file
Returns:
Tuple[str,str]: BOF and then EOF as `str`.
"""
file = self.get_absolute_path()
with file.open("rb") as file_bytes:
# BOF
bof = file_bytes.read(1024).hex()
# Navigate to EOF
try:
file_bytes.seek(-1024, 2)
except OSError:
# File too small :)
file_bytes.seek(-file_bytes.tell(), 2)
eof = file_bytes.read(1024).hex()
return (bof, eof)


class ArchiveFile(Identification, File):
"""ArchiveFile data model."""
Expand Down
5 changes: 2 additions & 3 deletions acacore/models/file_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from pathlib import Path
from typing import Any
from typing import ClassVar
from typing import Optional
from typing import Any, ClassVar, Optional

from pydantic import model_validator

from acacore.database.files_db import FileDB

from .base import ACABase
from .file import ArchiveFile

Expand Down
3 changes: 1 addition & 2 deletions acacore/models/history.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime
from typing import Optional
from typing import Union
from typing import Optional, Union

from pydantic import UUID4

Expand Down
3 changes: 1 addition & 2 deletions acacore/models/identification.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any
from typing import Optional
from typing import Any, Optional

from pydantic import model_validator

Expand Down
24 changes: 24 additions & 0 deletions acacore/models/reference_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Data models for the data on saved to different .json files on the `reference_files` repo."""
from typing import Optional

from pydantic import BaseModel


class ReIdentifyModel(BaseModel):
"""Data model for the `to_reidentify` from reference_files."""

puid: Optional[str] = None
name: Optional[str] = None
ext: Optional[str] = None
reasoning: Optional[str] = None


class CustomSignature(BaseModel):
"""Data model for the `costum_signatures` from reference_files."""

bof: Optional[str] = None
eof: Optional[str] = None
operator: Optional[str] = None
puid: Optional[str] = None
signature: Optional[str] = None
extension: Optional[str] = None
25 changes: 19 additions & 6 deletions acacore/reference_files/ref_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from http.client import HTTPResponse
from urllib import request

from models.reference_files import CustomSignature, ReIdentifyModel


@lru_cache
def to_re_identify() -> dict[str, str]:
def to_re_identify() -> list[ReIdentifyModel]:
"""Gets the json file with the different formats that we wish to reidentify.
Is kept updated on the reference-files repo. The function caches the result,
Expand All @@ -22,11 +24,16 @@ def to_re_identify() -> dict[str, str]:
if re_identify_map is None:
raise ConnectionError

return re_identify_map
result_list: list[ReIdentifyModel] = []
for key, values in re_identify_map.items():
result = ReIdentifyModel(puid=key, **values)
result_list.append(result)

return result_list


@lru_cache
def costum_sigs() -> list[dict]:
def costum_sigs() -> list[CustomSignature]:
"""Gets the json file with our own costum formats in a list.
Is kept updated on the reference-files repo. The function caches the result,
Expand All @@ -38,9 +45,15 @@ def costum_sigs() -> list[dict]:
if response.getcode() != 200:
raise ConnectionError

re_identify_map: dict[str, str] = json.loads(response.read())
custom_list: list[dict] = json.loads(response.read())

if re_identify_map is None:
if custom_list is None:
raise ConnectionError

return re_identify_map
result_list: list[CustomSignature] = []

for values in custom_list:
result = CustomSignature(**values)
result_list.append(result)

return result_list
35 changes: 16 additions & 19 deletions acacore/siegfried/siegfried.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run
from typing import Optional
from typing import Union
from subprocess import CompletedProcess, run
from typing import Optional, Union

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator

from acacore.exceptions.files import IdentificationError


def _check_process(process: CompletedProcess):
"""
Raises:
IdentificationError: if the process ends with a return code other than 0
"""
IdentificationError: if the process ends with a return code other than 0.
""" # noqa: D205
if process.returncode != 0:
raise IdentificationError(
process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}"
process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}",
)


Expand All @@ -32,8 +27,8 @@ class SiegfriedIdentifier(BaseModel):

class SiegfriedMatch(BaseModel):
ns: str
id: Optional[str]
format: str
id: Optional[str] # noqa: A003
format: str # noqa: A003
version: str
mime: str
match_class: str = Field(alias="class")
Expand Down Expand Up @@ -74,16 +69,16 @@ class Siegfried:
https://github.com/richardlehane/siegfried
"""

def __init__(self, binary: Union[str, PathLike] = "sf"):
def __init__(self, binary: Union[str, PathLike] = "sf") -> None:
"""
Args:
binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable
binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable.
Raises:
IdentificationError: If Siegfried is not configured properly
"""
IdentificationError: If Siegfried is not configured properly.
""" # noqa: D205
self.binary: str = str(binary)
_check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8"))
_check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8")) # noqa: PLW1510

def identify(self, path: Union[str, PathLike]) -> SiegfriedResult:
"""
Expand All @@ -102,6 +97,7 @@ def identify(self, path: Union[str, PathLike]) -> SiegfriedResult:
[self.binary, "-json", "-multi", "1024", str(path)],
capture_output=True,
encoding="utf-8",
check=False,
)
_check_process(process)
try:
Expand All @@ -117,7 +113,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]:
paths: The paths to the files
Returns:
A tuple of tuples joining the paths with their SiegfriedFile result
tuple[tuple[Path, SiegfriedFile]: A tuple of tuples joining the paths with their SiegfriedFile result
Raises:
IdentificationError: If there is an error calling Siegfried or processing its results
Expand All @@ -126,6 +122,7 @@ def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]:
[self.binary, "-json", "-multi", "1024", *map(str, paths)],
capture_output=True,
encoding="utf-8",
check=False,
)
_check_process(process)
try:
Expand Down
Empty file.
Loading

0 comments on commit 874fed7

Please sign in to comment.