Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add limited support for zip output (no encryption yet) #66

Merged
merged 22 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions acquire/outputs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from acquire.outputs.dir import DirectoryOutput
from acquire.outputs.tar import TarOutput
from acquire.outputs.zip import ZipOutput

__all__ = [
"DirectoryOutput",
"TarOutput",
]
__all__ = ["DirectoryOutput", "TarOutput", "ZipOutput"]

OUTPUTS = {
"tar": TarOutput,
"dir": DirectoryOutput,
}
OUTPUTS = {"tar": TarOutput, "dir": DirectoryOutput, "zip": ZipOutput}
114 changes: 114 additions & 0 deletions acquire/outputs/zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import io
janstarke marked this conversation as resolved.
Show resolved Hide resolved
import shutil
import stat
import zipfile
from datetime import datetime
from pathlib import Path
from typing import BinaryIO, Optional, Union

from dissect.target.filesystem import FilesystemEntry

from acquire.crypt import EncryptedStream
from acquire.outputs.base import Output


class ZipOutput(Output):
"""Zip archive acquire output format. Output can be compressed and/or encrypted.

Args:
path: The path to write the zip archive to.
compress: Whether to compress the zip archive.
encrypt: Whether to encrypt the zip archive.
public_key: The RSA public key to encrypt the header with.
"""

def __init__(
self,
path: Path,
compress: bool = False,
encrypt: bool = False,
public_key: Optional[bytes] = None,
) -> None:
ext = ".zip" if ".zip" not in path.suffixes else ""

Check warning on line 32 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L32

Added line #L32 was not covered by tests

if encrypt:
ext += ".enc"

Check warning on line 35 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L34-L35

Added lines #L34 - L35 were not covered by tests

self._fh = None
self.path = path.with_suffix(path.suffix + ext)

Check warning on line 38 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L37-L38

Added lines #L37 - L38 were not covered by tests

if compress:
self.compression = zipfile.ZIP_LZMA

Check warning on line 41 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L40-L41

Added lines #L40 - L41 were not covered by tests
else:
self.compression = zipfile.ZIP_STORED

Check warning on line 43 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L43

Added line #L43 was not covered by tests

if encrypt:
Schamper marked this conversation as resolved.
Show resolved Hide resolved
self._fh = EncryptedStream(self.path.open("wb"), public_key)
self.archive = zipfile.ZipFile(fileobj=self._fh, mode="w", compression=self.compression, allowZip64=True)

Check warning on line 47 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L45-L47

Added lines #L45 - L47 were not covered by tests
else:
self.archive = zipfile.ZipFile(self.path, mode="w", compression=self.compression, allowZip64=True)

Check warning on line 49 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L49

Added line #L49 was not covered by tests

def write(
self,
output_path: str,
fh: BinaryIO,
entry: Optional[Union[FilesystemEntry, Path]] = None,
size: Optional[int] = None,
) -> None:
"""Write a filesystem entry or file-like object to a zip file.

Args:
output_path: The path of the entry in the output format.
fh: The file-like object of the entry to write.
entry: The optional filesystem entry of the entry to write.
size: The optional file size in bytes of the entry to write.
"""
lstat = None
size = size or getattr(fh, "size", None)

Check warning on line 67 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L66-L67

Added lines #L66 - L67 were not covered by tests

if size is None and fh.seekable():
offset = fh.tell()
fh.seek(0, io.SEEK_END)
size = fh.tell()
fh.seek(offset)

Check warning on line 73 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L69-L73

Added lines #L69 - L73 were not covered by tests

info = zipfile.ZipInfo()
info.filename = output_path
info.file_size = size or 0

Check warning on line 77 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L75-L77

Added lines #L75 - L77 were not covered by tests

if entry:
if entry.is_symlink():

Check warning on line 80 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L79-L80

Added lines #L79 - L80 were not covered by tests
# System which created ZIP archive, 3 = Unix; 0 = Windows
# Windows does not have symlinks, so this must be a unixoid system
info.create_system = 3

Check warning on line 83 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L83

Added line #L83 was not covered by tests

# The Python zipfile module accepts the 16-bit "Mode" field (that stores st_mode field from
# struct stat, containing user/group/other permissions, setuid/setgid and symlink info, etc) of the
# ASi extra block for Unix as bits 16-31 of the external_attr
unix_st_mode = (

Check warning on line 88 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L88

Added line #L88 was not covered by tests
stat.S_IFLNK
| stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IWOTH
| stat.S_IXOTH
)
info.external_attr = unix_st_mode << 16

Check warning on line 100 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L100

Added line #L100 was not covered by tests

lstat = entry.lstat()
if lstat:
dt = datetime.fromtimestamp(lstat.st_mtime)
info.date_time = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)

Check warning on line 105 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L102-L105

Added lines #L102 - L105 were not covered by tests

with self.archive.open(info, "w") as zfh:
shutil.copyfileobj(fh, zfh)

Check warning on line 108 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L107-L108

Added lines #L107 - L108 were not covered by tests

def close(self) -> None:
"""Closes the archive file."""
self.archive.close()
janstarke marked this conversation as resolved.
Show resolved Hide resolved
if self._fh:
self._fh.close()

Check warning on line 114 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L112-L114

Added lines #L112 - L114 were not covered by tests
Loading