Skip to content

Commit

Permalink
Merge branch 'main' into add-siegfried_functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Magnus Lindholm committed Oct 17, 2023
2 parents 27fd8b7 + aca0141 commit 077f9e2
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 135 deletions.
3 changes: 1 addition & 2 deletions acacore/database/files_db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime
from os import PathLike
from sqlite3 import Connection
from typing import Any
from typing import Optional
from typing import Type
from typing import Union
Expand Down Expand Up @@ -137,7 +136,7 @@ def add_history(
self,
uuid: UUID,
operation: str,
data: Any, # noqa: ANN401
data: Optional[Union[dict, list, str, int, float, bool, datetime]],
reason: Optional[str] = None,
*,
time: Optional[datetime] = None,
Expand Down
2 changes: 2 additions & 0 deletions acacore/siegfried/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .siegfried import Siegfried
from .siegfried import SiegfriedResult
135 changes: 135 additions & 0 deletions acacore/siegfried/siegfried.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from datetime import datetime
from os import PathLike
from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run
from typing import Optional
from typing import Union

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import field_validator

from acacore.exceptions.files import IdentificationError


def _check_process(process: CompletedProcess):
"""
Raises:
IdentificationError: if the process ends with a return code other than 0
"""
if process.returncode != 0:
raise IdentificationError(
process.stderr or process.stdout or f"Unknown siegfried error code {process.returncode}"
)


class SiegfriedIdentifier(BaseModel):
name: str
details: str


class SiegfriedMatch(BaseModel):
ns: str
id: Optional[str]
format: str
version: str
mime: str
match_class: str = Field(alias="class")
basis: str
warning: str

# noinspection PyNestedDecorators
@field_validator("id")
@classmethod
def unknown_id(cls, _id: Optional[str]):
_id = (_id or "").strip()
return None if _id.lower() == "unknown" else _id or None


class SiegfriedFile(BaseModel):
filename: str
filesize: int
modified: datetime
errors: str
matches: list[SiegfriedMatch]


class SiegfriedResult(BaseModel):
siegfried: str
scandate: datetime
signature: str
created: datetime
identifiers: list[SiegfriedIdentifier]
files: list[SiegfriedFile]
model_config = ConfigDict(extra="forbid")


class Siegfried:
"""
A wrapper class to use the Siegfried program with Python and return the results with Pydantic models.
See Also:
https://github.com/richardlehane/siegfried
"""

def __init__(self, binary: Union[str, PathLike] = "sf"):
"""
Args:
binary: The path to the Siegfried binary, or the program name if it is included in the PATH variable
Raises:
IdentificationError: If Siegfried is not configured properly
"""
self.binary: str = str(binary)
_check_process(run([self.binary, "-v"], capture_output=True, encoding="utf-8"))

def identify(self, path: Union[str, PathLike]) -> SiegfriedResult:
"""
Identify a file.
Args:
path: The path to the file
Returns:
A SiegfriedResult object
Raises:
IdentificationError: If there is an error calling Siegfried or processing its results
"""
process: CompletedProcess = run(
[self.binary, "-json", "-multi", "1024", str(path)],
capture_output=True,
encoding="utf-8",
)
_check_process(process)
try:
return SiegfriedResult.model_validate_json(process.stdout)
except ValueError as err:
raise IdentificationError(err)

def identify_many(self, paths: list[Path]) -> tuple[tuple[Path, SiegfriedFile]]:
"""
Identify multiple files.
Args:
paths: The paths to the files
Returns:
A tuple of tuples joining the paths with their SiegfriedFile result
Raises:
IdentificationError: If there is an error calling Siegfried or processing its results
"""
process: CompletedProcess = run(
[self.binary, "-json", "-multi", "1024", *map(str, paths)],
capture_output=True,
encoding="utf-8",
)
_check_process(process)
try:
result = SiegfriedResult.model_validate_json(process.stdout)
return tuple(zip(paths, result.files))
except ValueError as err:
raise IdentificationError(err)
17 changes: 12 additions & 5 deletions acacore/utils/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@


def setup_logger(log_name: str, log_path: Path) -> Logger:
"""General method for setting op a log object. Ensures that the different logs we use across tools are standardized.
"""
General method for setting op a log object. Ensures that the different logs we use across tools are standardized.
Args:
log_name: The name given to the logger within the logging modules own namespace. All descendant logs needs
to have a name on the form 'log_name.descendant_log_name', which often is the name of the module or submodule
that the function is called from.
log_path: The path directly to the log as a `txt` file. If the file is not there, it will be created.
If it already exists, it will append the messages to the file.
## Args
* log_name: the name given to the logger within the logging modules own namespace. All descendant logs needs to have a name on the form 'log_name.descendant_log_name', which often is the name of the module or submodule that the function is called from.
* log_path: the path directly to the log as a `txt` file. If the file is not there, it will be created. If it already exists, it will append the messages to the file.
Returns:
A Logger instance.
"""
# If the parents of the file does not exist, then we make them

if not log_path.parent.exists():
Path.mkdir(log_path.parent, parents=True, exist_ok=True)

Expand Down
Loading

0 comments on commit 077f9e2

Please sign in to comment.