Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add limited support for zip output (no encryption yet) #66

Merged
merged 22 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions acquire/outputs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from acquire.outputs.dir import DirectoryOutput
from acquire.outputs.tar import TarOutput
from acquire.outputs.zip import ZipOutput

__all__ = [
"DirectoryOutput",
"TarOutput",
"ZipOutput"
]

OUTPUTS = {
"tar": TarOutput,
"dir": DirectoryOutput,
"zip": ZipOutput
}
112 changes: 112 additions & 0 deletions acquire/outputs/zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import io
janstarke marked this conversation as resolved.
Show resolved Hide resolved
import stat
import zipfile
from pathlib import Path
from typing import BinaryIO, Optional, Union
from datetime import datetime

from dissect.target.filesystem import FilesystemEntry

from acquire.crypt import EncryptedStream
from acquire.outputs.base import Output


class ZipOutput(Output):
"""zip archive acquire output format. Output can be compressed and/or encrypted.
janstarke marked this conversation as resolved.
Show resolved Hide resolved

Args:
path: The path to write the zip archive to.
compress: Whether to compress the zip archive.
encrypt: Whether to encrypt the zip archive.
public_key: The RSA public key to encrypt the header with.
"""

def __init__(
self,
path: Path,
compress: bool = False,
encrypt: bool = False,
public_key: Optional[bytes] = None,
) -> None:
ext = ".zip" if ".zip" not in path.suffixes else ""

Check warning on line 31 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L31

Added line #L31 was not covered by tests

if encrypt:
ext += ".enc"

Check warning on line 34 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L33-L34

Added lines #L33 - L34 were not covered by tests

self._fh = None
self.path = path.with_suffix(path.suffix + ext)

Check warning on line 37 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L36-L37

Added lines #L36 - L37 were not covered by tests

if compress:
self.compression = zipfile.ZIP_LZMA

Check warning on line 40 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L39-L40

Added lines #L39 - L40 were not covered by tests
else:
self.compression = zipfile.ZIP_STORED

Check warning on line 42 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L42

Added line #L42 was not covered by tests

if encrypt:
Schamper marked this conversation as resolved.
Show resolved Hide resolved
self._fh = EncryptedStream(self.path.open("wb"), public_key)
self.archive = zipfile.ZipFile(fileobj=self._fh,

Check warning on line 46 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L44-L46

Added lines #L44 - L46 were not covered by tests
mode='w',
compression=self.compression,
allowZip64=True)
else:
self.archive = zipfile.ZipFile(self.path,

Check warning on line 51 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L51

Added line #L51 was not covered by tests
mode='w',
compression=self.compression,
allowZip64=True)


def write(
self,
output_path: str,
fh: BinaryIO,
entry: Optional[Union[FilesystemEntry, Path]],
janstarke marked this conversation as resolved.
Show resolved Hide resolved
size: Optional[int] = None,
) -> None:
"""Write a filesystem entry or file-like object to a zip file.

Args:
output_path: The path of the entry in the output format.
fh: The file-like object of the entry to write.
entry: The optional filesystem entry of the entry to write.
size: The optional file size in bytes of the entry to write.
"""
lstat = None
size = size or getattr(fh, "size", None)

Check warning on line 73 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L72-L73

Added lines #L72 - L73 were not covered by tests

if size is None and fh.seekable():
offset = fh.tell()
fh.seek(0, io.SEEK_END)
size = fh.tell()
fh.seek(offset)

Check warning on line 79 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L75-L79

Added lines #L75 - L79 were not covered by tests

info = zipfile.ZipInfo()
info.filename = output_path

Check warning on line 82 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L81-L82

Added lines #L81 - L82 were not covered by tests

# some BinaryIO objects have no size, but `zipfile` uses len() in several places,
# so we read the whole data first
try:
info.file_size = len(fh)
except:
fh = fh.read()
info.file_size = len(fh)

Check warning on line 90 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L86-L90

Added lines #L86 - L90 were not covered by tests
janstarke marked this conversation as resolved.
Show resolved Hide resolved

if entry:

Check warning on line 92 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L92

Added line #L92 was not covered by tests

if entry.is_symlink():

Check warning on line 94 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L94

Added line #L94 was not covered by tests
# System which created ZIP archive, 3 = Unix; 0 = Windows
# Windows does not have symlinks, so this must be a unixoid system
info.create_system = 3

Check warning on line 97 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L97

Added line #L97 was not covered by tests

# The Python zipfile module accepts the 16-bit "Mode" field (that stores st_mode field from struct stat, containing user/group/other permissions, setuid/setgid and symlink info, etc) of the ASi extra block for Unix as bits 16-31 of the external_attr
unix_st_mode = stat.S_IFLNK | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH
info.external_attr = unix_st_mode << 16

Check warning on line 101 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L100-L101

Added lines #L100 - L101 were not covered by tests

lstat = entry.lstat()
if lstat:
dt = datetime.fromtimestamp(lstat.st_mtime)
info.date_time = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)

Check warning on line 106 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L103-L106

Added lines #L103 - L106 were not covered by tests

self.archive.writestr(info, fh, compress_type=self.compression)

Check warning on line 108 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L108

Added line #L108 was not covered by tests
janstarke marked this conversation as resolved.
Show resolved Hide resolved

def close(self) -> None:
"""Closes the archive file."""
self.archive.close()

Check warning on line 112 in acquire/outputs/zip.py

View check run for this annotation

Codecov / codecov/patch

acquire/outputs/zip.py#L112

Added line #L112 was not covered by tests
janstarke marked this conversation as resolved.
Show resolved Hide resolved
Loading