Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
# Conflicts:
#	acacore/siegfried/siegfried.py
  • Loading branch information
MatteoCampinoti94 committed Oct 17, 2023
2 parents 8d3bc02 + 7e1f442 commit 52b47b2
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 57 deletions.
16 changes: 3 additions & 13 deletions acacore/database/base.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
from sqlite3 import Connection
from sqlite3 import Connection, OperationalError
from sqlite3 import Cursor as SQLiteCursor
from sqlite3 import OperationalError
from typing import Any
from typing import Generator
from typing import Generic
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from typing import overload
from typing import Any, Generator, Generic, Optional, Type, TypeVar, Union, overload

from pydantic.main import BaseModel

from .column import Column
from .column import SelectColumn
from .column import model_to_columns
from .column import Column, SelectColumn, model_to_columns

T = TypeVar("T")
R = TypeVar("R")
Expand Down
7 changes: 1 addition & 6 deletions acacore/database/column.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from datetime import datetime
from pathlib import Path
from typing import Callable
from typing import Generic
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union
from typing import Callable, Generic, Optional, Type, TypeVar, Union
from uuid import UUID

from pydantic import BaseModel
Expand Down
16 changes: 6 additions & 10 deletions acacore/database/files_db.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from datetime import datetime
from os import PathLike
from sqlite3 import Connection
from typing import Optional
from typing import Type
from typing import Union
from typing import Optional, Type, Union
from uuid import UUID

from acacore.utils.functions import or_none
from .base import Column
from .base import FileDBBase
from .base import SelectColumn

from .base import Column, FileDBBase, SelectColumn


class FileDB(FileDBBase):
Expand Down Expand Up @@ -44,8 +41,7 @@ def __init__(
to avoid parsing overhead.
uri: If set to True, database is interpreted as a URI with a file path and an optional query string.
"""
from acacore.models.file import ConvertedFile
from acacore.models.file import File
from acacore.models.file import ConvertedFile, File
from acacore.models.history import HistoryEntry
from acacore.models.identification import SignatureCount
from acacore.models.metadata import Metadata
Expand Down Expand Up @@ -148,6 +144,6 @@ def add_history(
operation=operation,
data=data,
reason=reason,
time=time or datetime.utcnow(),
)
time=time or datetime.now(), # noqa: DTZ005
),
)
1 change: 1 addition & 0 deletions acacore/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from . import file_data
from . import identification
from . import metadata
from . import reference_files
76 changes: 72 additions & 4 deletions acacore/models/file.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import re
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple

from pydantic import Field
from pydantic import UUID4
from pydantic import UUID4, Field

from acacore.models.reference_files import CustomSignature
from acacore.siegfried.siegfried import Siegfried, SiegfriedFile
from acacore.utils.io import size_fmt

from .base import ACABase
from .identification import Identification


# -----------------------------------------------------------------------------
# Model
# -----------------------------------------------------------------------------
Expand All @@ -31,6 +33,50 @@ class File(ACABase):
warning: Optional[str] = None
action: Optional[str] = None

def identify(self, sf: Siegfried) -> SiegfriedFile:
"""Identify the file using `siegfried`.
Args:
sf (Siegfried): A Siegfried class object
Returns:
SiegfriedFile: A dataclass object containing the results from the identification
"""
return sf.identify(self.get_absolute_path())[0]

def re_identify_with_aca(self, costum_sigs: list[CustomSignature]) -> None:
"""Uses the BOF and EOF to try to determine a ACAUID for the file.
The costum_sigs list should be found on the `reference_files` repo.
If no match can be found, the method does nothing.
Args:
costum_sigs: A list of the costum_signatures that the file should be checked against
"""
bof, eof = self.get_bof_and_eof()
# We have to go through all of the signatures in order to check their BOF en EOF with the file.
for sig in costum_sigs:
if sig.bof and sig.eof:
bof_pattern = re.compile(sig.bof)
eof_pattern = re.compile(sig.eof)
if sig.operator == "OR":
if bof_pattern.search(bof) or eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.operator == "AND" and bof_pattern.search(bof) and eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.bof:
bof_pattern = re.compile(sig.bof)
if bof_pattern.search(bof):
self.puid = sig.puid
self.signature = sig.signature
elif sig.eof:
eof_pattern = re.compile(sig.eof)
if eof_pattern.search(eof):
self.puid = sig.puid
self.signature = sig.signature

def get_absolute_path(self, root: Optional[Path] = None) -> Path:
return root.joinpath(self.relative_path) if root else self.relative_path.resolve()

Expand Down Expand Up @@ -96,6 +142,28 @@ def size_fmt(self) -> str:
"""
return str(size_fmt(self.get_absolute_path().stat().st_size))

def get_bof_and_eof(self) -> Tuple[str, str]:
"""Get the first and last kilobyte of the file.
Args:
file (Path): Path to file
Returns:
Tuple[str,str]: BOF and then EOF as `str`.
"""
file = self.get_absolute_path()
with file.open("rb") as file_bytes:
# BOF
bof = file_bytes.read(1024).hex()
# Navigate to EOF
try:
file_bytes.seek(-1024, 2)
except OSError:
# File too small :)
file_bytes.seek(-file_bytes.tell(), 2)
eof = file_bytes.read(1024).hex()
return (bof, eof)


class ArchiveFile(Identification, File):
"""ArchiveFile data model."""
Expand Down
5 changes: 2 additions & 3 deletions acacore/models/file_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from pathlib import Path
from typing import Any
from typing import ClassVar
from typing import Optional
from typing import Any, ClassVar, Optional

from pydantic import model_validator

from acacore.database.files_db import FileDB

from .base import ACABase
from .file import ArchiveFile

Expand Down
3 changes: 1 addition & 2 deletions acacore/models/history.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime
from typing import Optional
from typing import Union
from typing import Optional, Union

from pydantic import UUID4

Expand Down
3 changes: 1 addition & 2 deletions acacore/models/identification.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Any
from typing import Optional
from typing import Any, Optional

from pydantic import model_validator

Expand Down
24 changes: 24 additions & 0 deletions acacore/models/reference_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Data models for the data on saved to different .json files on the `reference_files` repo."""
from typing import Optional

from pydantic import BaseModel


class ReIdentifyModel(BaseModel):
"""Data model for the `to_reidentify` from reference_files."""

puid: Optional[str] = None
name: Optional[str] = None
ext: Optional[str] = None
reasoning: Optional[str] = None


class CustomSignature(BaseModel):
"""Data model for the `costum_signatures` from reference_files."""

bof: Optional[str] = None
eof: Optional[str] = None
operator: Optional[str] = None
puid: Optional[str] = None
signature: Optional[str] = None
extension: Optional[str] = None
2 changes: 2 additions & 0 deletions acacore/reference_files/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"""Collection of methods that allows us to """
from . import ref_files
59 changes: 59 additions & 0 deletions acacore/reference_files/ref_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
from functools import lru_cache
from http.client import HTTPResponse
from urllib import request

from models.reference_files import CustomSignature, ReIdentifyModel


@lru_cache
def to_re_identify() -> list[ReIdentifyModel]:
"""Gets the json file with the different formats that we wish to reidentify.
Is kept updated on the reference-files repo. The function caches the result,
soo multiple calls in the same run should not be an issue.
"""
response: HTTPResponse = request.urlopen(
"https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/to_reidentify.json",
)
if response.getcode() != 200:
raise ConnectionError

re_identify_map: dict[str, str] = json.loads(response.read())

if re_identify_map is None:
raise ConnectionError

result_list: list[ReIdentifyModel] = []
for key, values in re_identify_map.items():
result = ReIdentifyModel(puid=key, **values)
result_list.append(result)

return result_list


@lru_cache
def costum_sigs() -> list[CustomSignature]:
"""Gets the json file with our own costum formats in a list.
Is kept updated on the reference-files repo. The function caches the result,
soo multiple calls in the same run should not be an issue.
"""
response: HTTPResponse = request.urlopen(
"https://raw.githubusercontent.com/aarhusstadsarkiv/reference-files/main/custom_signatures.json",
)
if response.getcode() != 200:
raise ConnectionError

custom_list: list[dict] = json.loads(response.read())

if custom_list is None:
raise ConnectionError

result_list: list[CustomSignature] = []

for values in custom_list:
result = CustomSignature(**values)
result_list.append(result)

return result_list
14 changes: 6 additions & 8 deletions acacore/siegfried/siegfried.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from os import PathLike
from pathlib import Path
from re import compile as re_compile
from subprocess import CompletedProcess
from subprocess import run
from subprocess import CompletedProcess, run
from typing import Literal
from typing import Optional
from typing import Union
from typing import Optional, Union

from pydantic import BaseModel
from pydantic import ConfigDict
Expand All @@ -26,8 +24,8 @@
def _check_process(process: CompletedProcess) -> CompletedProcess:
"""
Raises:
IdentificationError: if the process ends with a return code other than 0
"""
IdentificationError: if the process ends with a return code other than 0.
""" # noqa: D205
if process.returncode != 0:
raise IdentificationError(
process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}"
Expand Down Expand Up @@ -67,8 +65,8 @@ class SiegfriedMatch(BaseModel):
"""

ns: str
id: Optional[str]
format: str
id: Optional[str] # noqa: A003
format: str # noqa: A003
version: Optional[str] = None
mime: str
match_class: Optional[str] = Field(None, alias="class")
Expand Down
4 changes: 1 addition & 3 deletions acacore/utils/functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from typing import Callable
from typing import Optional
from typing import TypeVar
from typing import Callable, Optional, TypeVar

T = TypeVar("T")
R = TypeVar("R")
Expand Down
7 changes: 1 addition & 6 deletions acacore/utils/log.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
from logging import FileHandler
from logging import Formatter
from logging import INFO
from logging import Logger
from logging import getLogger
from logging import INFO, FileHandler, Formatter, Logger, getLogger
from pathlib import Path


Expand All @@ -20,7 +16,6 @@ def setup_logger(log_name: str, log_path: Path) -> Logger:
Returns:
A Logger instance.
"""

if not log_path.parent.exists():
Path.mkdir(log_path.parent, parents=True, exist_ok=True)

Expand Down

0 comments on commit 52b47b2

Please sign in to comment.