From 74998ba01e0077e395131b8067c339bac216a9cc Mon Sep 17 00:00:00 2001 From: pyrco <105293448+pyrco@users.noreply.github.com> Date: Fri, 9 Feb 2024 13:39:24 +0100 Subject: [PATCH] New collection and de-duplication logic When paths contain intermediate symlinks, these intermediate symlinks are collected instead of collecting the unresolved path as is. De-duplication between sysvol and it's equivalent drive letter path is done properly. Collecting of special files (e.g. MFT, Recyclebin etc.) is properly abstracted. (DIS-1222) --- acquire/acquire.py | 190 +++++----- acquire/collector.py | 554 ++++++++++++++++++---------- acquire/utils.py | 36 +- tests/conftest.py | 4 + tests/test_collector.py | 775 +++++++++++++++++++++++++++------------- tests/test_utils.py | 194 ++++++---- 6 files changed, 1131 insertions(+), 622 deletions(-) diff --git a/acquire/acquire.py b/acquire/acquire.py index 9bb64949..5146c63b 100644 --- a/acquire/acquire.py +++ b/acquire/acquire.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import argparse import enum import functools @@ -16,9 +18,9 @@ from collections import defaultdict, namedtuple from itertools import product from pathlib import Path -from typing import Iterator, Optional, Union +from typing import BinaryIO, Callable, Iterator, Optional, Union -from dissect.target import Target, exceptions +from dissect.target import Target from dissect.target.filesystem import Filesystem from dissect.target.filesystems import ntfs from dissect.target.helpers import fsutil @@ -149,46 +151,48 @@ def misc_osx_user_homes(target: Target) -> Iterator[fsutil.TargetPath]: def from_user_home(target: Target, path: str) -> Iterator[str]: try: for user_details in target.user_details.all_with_home(): - yield normalize_path(target, user_details.home_path.joinpath(path), lower_case=False) + yield user_details.home_path.joinpath(path).as_posix() except Exception as e: log.warning("Error occurred when requesting all user homes") log.debug("", exc_info=e) misc_user_homes = MISC_MAPPING.get(target.os, misc_unix_user_homes) for user_dir in misc_user_homes(target): - yield str(user_dir.joinpath(path)) + yield user_dir.joinpath(path).as_posix() -def iter_ntfs_filesystems(target: Target) -> Iterator[tuple[ntfs.NtfsFilesystem, str, str]]: +def iter_ntfs_filesystems(target: Target) -> Iterator[tuple[ntfs.NtfsFilesystem, Optional[str], str, str]]: mount_lookup = defaultdict(list) for mount, fs in target.fs.mounts.items(): mount_lookup[fs].append(mount) - sysvol = target.fs.mounts["sysvol"] for fs in target.filesystems: - if fs in mount_lookup: - mountpoints = ", ".join(mount_lookup[fs]) - else: - mountpoints = "No mounts" - # The attr check is needed to correctly collect fake NTFS filesystems # where the MFT etc. are added to a VirtualFilesystem. This happens for # instance when the target is an acquired tar target. if not isinstance(fs, ntfs.NtfsFilesystem) and not hasattr(fs, "ntfs"): - log.warning("Skipping %s (%s) - not an NTFS filesystem", fs, mountpoints) + log.warning("Skipping %s - not an NTFS filesystem", fs) continue - if fs == sysvol: - name = "sysvol" - elif fs in mount_lookup: - name = mount_lookup[fs][0] + if fs in mount_lookup: + mountpoints = mount_lookup[fs] + + for main_mountpoint in mountpoints: + if main_mountpoint != "sysvol": + break + + name = main_mountpoint + mountpoints = ", ".join(mountpoints) else: + main_mountpoint = None name = f"vol-{fs.ntfs.serial:x}" + mountpoints = "No mounts" + log.warning("Unmounted NTFS filesystem found %s (%s)", fs, name) - yield fs, name, mountpoints + yield fs, main_mountpoint, name, mountpoints -def iter_esxi_filesystems(target: Target) -> Iterator[tuple[str, str, Filesystem]]: +def iter_esxi_filesystems(target: Target) -> Iterator[tuple[Filesystem, str, str, Optional[str]]]: for mount, fs in target.fs.mounts.items(): if not mount.startswith("/vmfs/volumes/"): continue @@ -200,11 +204,11 @@ def iter_esxi_filesystems(target: Target) -> Iterator[tuple[str, str, Filesystem elif fs.__type__ == "vmfs": name = fs.vmfs.label - yield uuid, name, fs + yield fs, mount, uuid, name -def register_module(*args, **kwargs): - def wrapper(module_cls): +def register_module(*args, **kwargs) -> Callable[[type[Module]], type[Module]]: + def wrapper(module_cls: type[Module]) -> type[Module]: name = module_cls.__name__ if name in MODULES: @@ -228,8 +232,8 @@ def wrapper(module_cls): return wrapper -def module_arg(*args, **kwargs): - def wrapper(module_cls): +def module_arg(*args, **kwargs) -> Callable[[type[Module]], type[Module]]: + def wrapper(module_cls: type[Module]) -> type[Module]: if not hasattr(module_cls, "__cli_args__"): module_cls.__cli_args__ = [] module_cls.__cli_args__.append((args, kwargs)) @@ -238,7 +242,7 @@ def wrapper(module_cls): return wrapper -def local_module(cls): +def local_module(cls: type[object]) -> object: """A decorator that sets property `__local__` on a module class to mark it for local target only""" cls.__local__ = True return cls @@ -308,22 +312,25 @@ class NTFS(Module): @classmethod def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None: - for fs, name, mountpoints in iter_ntfs_filesystems(target): - log.info("Acquiring %s (%s)", fs, mountpoints) + for fs, main_mountpoint, name, mountpoints in iter_ntfs_filesystems(target): + log.info("Acquiring from %s as %s (%s)", fs, name, mountpoints) + + for filename in ("$MFT", "$Boot", "$Secure:$SDS"): + if main_mountpoint is not None: + path = fsutil.join(main_mountpoint, filename) + collector.collect_path(path) - collector.collect_file(fs.path("$MFT"), outpath=name + "/$MFT") - collector.collect_file(fs.path("$Boot"), outpath=name + "/$Boot") + else: + # In case the NTFS filesystem is not mounted, which should not occur but + # iter_ntfs_filesystems allows for the possibility, we fall back to raw file + # collection. + collector.collect_file_raw(filename, fs, name) cls.collect_usnjrnl(collector, fs, name) - cls.collect_ntfs_secure(collector, fs, name) @classmethod def collect_usnjrnl(cls, collector: Collector, fs: Filesystem, name: str) -> None: - try: - usnjrnl_path = fs.path("$Extend/$Usnjrnl:$J") - entry = usnjrnl_path.get() - journal = entry.open() - + def usnjrnl_accessor(journal: BinaryIO) -> tuple[BinaryIO, int]: # If the filesystem is a virtual NTFS filesystem, journal will be # plain BinaryIO, not a RunlistStream. if isinstance(journal, RunlistStream): @@ -331,57 +338,18 @@ def collect_usnjrnl(cls, collector: Collector, fs: Filesystem, name: str) -> Non while journal.runlist[i][0] is None: journal.seek(journal.runlist[i][1] * journal.block_size, io.SEEK_CUR) i += 1 + size = journal.size - journal.tell() + else: + size = journal.size - # Use the same method to construct the output path as is used in - # collector.collect_file() - outpath = collector._output_path(f"{name}/$Extend/$Usnjrnl:$J") - - collector.output.write( - outpath, - journal, - size=journal.size - journal.tell(), - entry=entry, - ) - collector.report.add_file_collected(cls.__name__, usnjrnl_path) - result = "OK" - except exceptions.FileNotFoundError: - collector.report.add_file_missing(cls.__name__, usnjrnl_path) - result = "File not found" - except Exception as err: - log.debug("Failed to acquire UsnJrnl", exc_info=True) - collector.report.add_file_failed(cls.__name__, usnjrnl_path) - result = repr(err) - - log.info("- Collecting file $Extend/$Usnjrnl:$J: %s", result) - - @classmethod - def collect_ntfs_secure(cls, collector: Collector, fs: Filesystem, name: str) -> None: - try: - secure_path = fs.path("$Secure:$SDS") - entry = secure_path.get() - sds = entry.open() - - # Use the same method to construct the output path as is used in - # collector.collect_file() - outpath = collector._output_path(f"{name}/$Secure:$SDS") - - collector.output.write( - outpath, - sds, - size=sds.size, - entry=entry, - ) - collector.report.add_file_collected(cls.__name__, secure_path) - result = "OK" - except FileNotFoundError: - collector.report.add_file_missing(cls.__name__, secure_path) - result = "File not found" - except Exception as err: - log.debug("Failed to acquire SDS", exc_info=True) - collector.report.add_file_failed(cls.__name__, secure_path) - result = repr(err) + return (journal, size) - log.info("- Collecting file $Secure:$SDS: %s", result) + collector.collect_file_raw( + "$Extend/$Usnjrnl:$J", + fs, + name, + file_accessor=usnjrnl_accessor, + ) @register_module("-r", "--registry") @@ -722,13 +690,20 @@ def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector patterns.extend(["$Recycle.Bin/$R*", "$Recycle.Bin/*/$R*", "RECYCLE*/D*"]) with collector.file_filter(large_files_filter): - for fs, name, mountpoints in iter_ntfs_filesystems(target): - log.info("Acquiring recycle bin from %s (%s)", fs, mountpoints) + for fs, main_mountpoint, name, mountpoints in iter_ntfs_filesystems(target): + log.info("Acquiring recycle bin from %s as %s (%s)", fs, name, mountpoints) for pattern in patterns: - for entry in fs.path().glob(pattern): - if entry.is_file(): - collector.collect_file(entry, outpath=fsutil.join(name, str(entry))) + if main_mountpoint is not None: + pattern = fsutil.join(main_mountpoint, pattern) + collector.collect_glob(pattern) + else: + # In case the NTFS filesystem is not mounted, which should not occur but + # iter_ntfs_filesystems allows for the possibility, we fall back to raw file + # collection. + for entry in fs.path().glob(pattern): + if entry.is_file(): + collector.collect_file_raw(fs, entry, name) @register_module("--drivers") @@ -1441,21 +1416,24 @@ def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector "bootbank": "BOOTBANK1", "altbootbank": "BOOTBANK2", } - boot_fs = [] + boot_fs = {} for boot_dir, boot_vol in boot_dirs.items(): dir_path = target.fs.path(boot_dir) if dir_path.is_symlink() and dir_path.exists(): dst = dir_path.readlink() - boot_fs.append((dst.name, boot_vol, dst.get().top.fs)) + fs = dst.get().top.fs + boot_fs[fs] = boot_vol - for uuid, name, fs in boot_fs: - log.info("Acquiring /vmfs/volumes/%s (%s)", uuid, name) - base = f"fs/{uuid}:{name}" - for path in fs.path("/").rglob("*"): - if not path.is_file(): - continue - collector.collect_file(path, outpath=path, base=base) + for fs, mountpoint, uuid, _ in iter_esxi_filesystems(target): + if fs in boot_fs: + name = boot_fs[fs] + log.info("Acquiring %s (%s)", mountpoint, name) + mountpoint_len = len(mountpoint) + base = f"fs/{uuid}:{name}" + for path in target.fs.path(mountpoint).rglob("*"): + outpath = path.as_posix()[mountpoint_len:] + collector.collect_path(path, outpath=outpath, base=base) @register_module("--esxi") @@ -1478,16 +1456,16 @@ class VMFS(Module): @classmethod def _run(cls, target: Target, cli_args: argparse.Namespace, collector: Collector) -> None: - for uuid, name, fs in iter_esxi_filesystems(target): + for fs, mountpoint, uuid, name in iter_esxi_filesystems(target): if not fs.__type__ == "vmfs": continue - log.info("Acquiring /vmfs/volumes/%s (%s)", uuid, name) + log.info("Acquiring %s (%s)", mountpoint, name) + mountpoint_len = len(mountpoint) base = f"fs/{uuid}:{name}" - for path in fs.path("/").glob("*.sf"): - if not path.is_file(): - continue - collector.collect_file(path, outpath=path, base=base) + for path in target.fs.path(mountpoint).glob("*.sf"): + outpath = path.as_posix()[mountpoint_len:] + collector.collect_path(path, outpath=outpath, base=base) @register_module("--activities-cache") @@ -1727,7 +1705,7 @@ def acquire_target_regular(target: Target, args: argparse.Namespace, output_ts: if log_file: files.append(log_file) if target.path.name == "local": - skip_list.add(normalize_path(target, log_file, resolve=True)) + skip_list.add(normalize_path(target, log_file, resolve_parents=True, preserve_case=False)) print_disks_overview(target) print_volumes_overview(target) @@ -1817,7 +1795,7 @@ def acquire_target_regular(target: Target, args: argparse.Namespace, output_ts: log.info("Logging to file %s", log_path) files = [log_file_handler.baseFilename] if target.path.name == "local": - skip_list = {normalize_path(target, log_path, resolve=True)} + skip_list = {normalize_path(target, log_path, resolve_parents=True, preserve_case=False)} output_path = args.output or args.output_file if output_path.is_dir(): @@ -1833,7 +1811,7 @@ def acquire_target_regular(target: Target, args: argparse.Namespace, output_ts: ) files.append(output.path) if target.path.name == "local": - skip_list.add(normalize_path(target, output.path, resolve=True)) + skip_list.add(normalize_path(target, output.path, resolve_parents=True, preserve_case=False)) log.info("Writing output to %s", output.path) if skip_list: diff --git a/acquire/collector.py b/acquire/collector.py index d44301e4..e27f5b0d 100644 --- a/acquire/collector.py +++ b/acquire/collector.py @@ -9,10 +9,9 @@ from contextlib import contextmanager from dataclasses import dataclass from itertools import groupby -from pathlib import Path from typing import ( TYPE_CHECKING, - Any, + BinaryIO, Callable, Iterable, Optional, @@ -28,14 +27,10 @@ NotASymlinkError, SymlinkRecursionError, ) +from dissect.target.filesystem import Filesystem from dissect.target.helpers import fsutil -from acquire.utils import ( - StrEnum, - get_formatted_exception, - normalize_path, - normalize_sysvol, -) +from acquire.utils import StrEnum, get_formatted_exception, normalize_path if TYPE_CHECKING: from acquire.outputs.base import Output @@ -70,43 +65,34 @@ class Record: details: Optional[str] = None -def serialize_path(path: Any) -> str: - if not isinstance(path, fsutil.TargetPath): - return str(path) - - if not getattr(path, "_fs", None): - return str(path) - - # Naive way to serialize TargetPath filesystem's metadata is - # to rely on uniqueness of `path._fs` object - fs = path._fs - fs_id = id(fs) - fs_type = fs.__type__ - path = str(path) - if not fs.case_sensitive: - path = path.lower() - - return f"{fs_type}:{fs_id}:{path}" - - @dataclass class CollectionReport: - registry: set[Record] = dataclasses.field(default_factory=set) + target: Target + registry: set[Record] = dataclasses.field(default_factory=set) seen_paths: set[str] = dataclasses.field(default_factory=set) + def _uniq_path(self, path: Union[str, fsutil.TargetPath]) -> str: + path = normalize_path(self.target, path, resolve_parents=False, preserve_case=False) + # Depending on the way they are constructed, windows paths may start with a root '/' + # followed by a drive letter or start immediately with a drive letter (and no root. To make + # sure both types are matched identical, add a root if none is present. + if not path.startswith("/"): + path = f"/{path}" + + return path + def _register( self, module_name: str, outcome: Outcome, artifact_type: ArtifactType, - artifact_value: Union[str, Path], + artifact_value: Union[str, fsutil.TargetPath], details: Optional[str] = None, ) -> None: - if isinstance(artifact_value, Path): - artifact_value = serialize_path(artifact_value) - - if artifact_type in (ArtifactType.FILE, ArtifactType.DIR): + if artifact_type in (ArtifactType.FILE, ArtifactType.DIR, ArtifactType.SYMLINK, ArtifactType.PATH): + # Any path like artefacts are expected to be resolved to the level needed. + artifact_value = self._uniq_path(artifact_value) self.seen_paths.add(artifact_value) self.registry.add( @@ -119,20 +105,20 @@ def _register( ) ) - def add_file_collected(self, module: str, path: Path) -> None: + def add_file_collected(self, module: str, path: fsutil.TargetPath) -> None: self._register(module, Outcome.SUCCESS, ArtifactType.FILE, path) - def add_symlink_collected(self, module: str, path: Path) -> None: + def add_symlink_collected(self, module: str, path: fsutil.TargetPath) -> None: self._register(module, Outcome.SUCCESS, ArtifactType.SYMLINK, path) - def add_symlink_failed(self, module: str, path: Path) -> None: + def add_symlink_failed(self, module: str, path: fsutil.TargetPath) -> None: self._register(module, Outcome.FAILURE, ArtifactType.SYMLINK, path) - def add_file_failed(self, module: str, failed_path: Path) -> None: + def add_file_failed(self, module: str, failed_path: fsutil.TargetPath) -> None: exc = get_formatted_exception() self._register(module, Outcome.FAILURE, ArtifactType.FILE, failed_path, exc) - def add_file_missing(self, module: str, missing_path: Path) -> None: + def add_file_missing(self, module: str, missing_path: fsutil.TargetPath) -> None: self._register(module, Outcome.MISSING, ArtifactType.FILE, missing_path) def add_glob_failed(self, module: str, failed_pattern: str) -> None: @@ -142,21 +128,24 @@ def add_glob_failed(self, module: str, failed_pattern: str) -> None: def add_glob_empty(self, module: str, pattern: str) -> None: self._register(module, Outcome.EMPTY, ArtifactType.GLOB, pattern) - def add_dir_collected(self, module: str, path: Path) -> None: + def add_dir_collected(self, module: str, path: fsutil.TargetPath) -> None: self._register(module, Outcome.SUCCESS, ArtifactType.DIR, path) - def add_dir_failed(self, module: str, failed_path: Path) -> None: + def add_dir_failed(self, module: str, failed_path: fsutil.TargetPath) -> None: exc = get_formatted_exception() self._register(module, Outcome.FAILURE, ArtifactType.DIR, failed_path, exc) - def add_dir_missing(self, module: str, missing_path: Path) -> None: + def add_dir_missing(self, module: str, missing_path: fsutil.TargetPath) -> None: self._register(module, Outcome.MISSING, ArtifactType.DIR, missing_path) - def add_path_failed(self, module: str, failed_path: Path) -> None: + def add_path_collected(self, module: str, path: fsutil.TargetPath) -> None: + self._register(module, Outcome.SUCCESS, ArtifactType.PATH, path) + + def add_path_failed(self, module: str, failed_path: fsutil.TargetPath) -> None: exc = get_formatted_exception() self._register(module, Outcome.FAILURE, ArtifactType.PATH, failed_path, exc) - def add_path_missing(self, module: str, missing_path: Path) -> None: + def add_path_missing(self, module: str, missing_path: fsutil.TargetPath) -> None: self._register(module, Outcome.MISSING, ArtifactType.PATH, missing_path) def add_command_collected(self, module: str, command_parts: Sequence[str]) -> None: @@ -189,8 +178,9 @@ def get_counts_per_module_per_outcome(self) -> dict[str, dict[str, int]]: records_map[module][outcome] = len(records_per_module_outcome) return records_map - def was_path_seen(self, path: Path) -> bool: - return serialize_path(path) in self.seen_paths + def was_path_seen(self, path: str | fsutil.TargetPath) -> bool: + path = self._uniq_path(path) + return path in self.seen_paths class Collector: @@ -203,7 +193,7 @@ def __init__(self, target: Target, output: Output, base: str = "fs", skip_list: self.base = base self.skip_list = skip_list or set() - self.report = CollectionReport() + self.report = CollectionReport(target) self.bound_module_name = None self.filter = lambda _: False @@ -241,24 +231,32 @@ def unbind(self) -> None: def close(self) -> None: self.output.close() - def _output_path(self, path: Path, base: Optional[str] = None) -> str: - base = base or self.base - outpath = str(path) + def _output_path(self, path: str | fsutil.TargetPath, base: Optional[str] = None) -> str: + if base is None: + base = self.base - if sysvol_drive := self.target.props.get("sysvol_drive"): - outpath = normalize_sysvol(outpath, sysvol_drive) + # When constructing an output path from a collected path, normalization generally already + # happened and is not needed, so this will be a no-op. However when constructing an output + # path based on an explicitly provided output path, it is nice to be able to normalize any + # sysvol part to an actual driveletter. + outpath = normalize_path(self.target, path, resolve_parents=False, preserve_case=True) if base: - # Make sure that `outpath` is not an abolute path, since - # `fsutil.join()` (that uses `posixpath.join()`) discards all previous path - # components if an encountered component is an absolute path. + base = base.strip("/") + # Make sure that `outpath` is not an abolute path, since `fsutil.join()` (which uses + # `posixpath.join()`) discards all previous path components if an encountered component + # is an absolute path. outpath = outpath.lstrip("/") outpath = fsutil.join(base, outpath) return outpath def collect( - self, spec: Iterable, module_name: Optional[str] = None, follow: bool = True, volatile: bool = False + self, + spec: Iterable, + module_name: Optional[str] = None, + follow: bool = True, + volatile: bool = False, ) -> None: module_name = self.bound_module_name or module_name if not module_name: @@ -277,10 +275,8 @@ def collect( values = [value] for value in values: - if artifact_type in (ArtifactType.FILE, ArtifactType.DIR): - self.collect_path(value, module_name=module_name, follow=follow, volatile=volatile) - elif artifact_type == ArtifactType.SYMLINK: - self.collect_symlink(value, module_name=module_name) + if artifact_type in (ArtifactType.FILE, ArtifactType.DIR, ArtifactType.SYMLINK, ArtifactType.PATH): + self.collect_path(value, module_name=module_name, volatile=volatile) elif artifact_type == ArtifactType.GLOB: self.collect_glob(value, module_name=module_name) elif artifact_type == ArtifactType.COMMAND: @@ -289,15 +285,73 @@ def collect( else: raise ValueError("Unknown artifact type %s in spec: %s", artifact_type, spec) - def collect_file( + def _get_symlink_branches(self, path: fsutil.TargetPath) -> (fsutil.TargetPath, list[fsutil.TargetPath]): + """Given a ``path`` that contains symlinks in any of its intermediate parts, collect all these + intermediate branches that end in a symlink. + + Args: + path: The path to collect the branches for. It is assumed to be normalized with respect to path + separators and Windows device root and sysvol parts. + + Returns: + A tuple of the full path with all intermediaries resolved except for its final part and a list of + the collected intermediate symlink branches. + """ + cur_path = None + branches = [] + + for path_part in path.parts[:-1]: + if cur_path is None: + cur_path = self.target.fs.path(path_part) + else: + cur_path = cur_path.joinpath(path_part) + + if cur_path.is_symlink(): + branches.append(cur_path) + + # resolve() fully resolves cur_path, so there is no use in + # recursively calling _get_symlink_branches(), we only need to walk + # over the remaining parts to see if any of them are symlinks. + cur_path = cur_path.resolve() + + last_part = path.parts[-1] + path = cur_path.joinpath(last_part) + + return path, branches + + def collect_path( self, - path: Union[str, fsutil.TargetPath], - size: Optional[int] = None, + path: str | fsutil.TargetPath, outpath: Optional[str] = None, module_name: Optional[str] = None, base: Optional[str] = None, volatile: bool = False, + seen_paths: set[fsutil.TargetPath] = None, ) -> None: + """Collect a path from the target's root filesystem, including any intermediary symlinks. + + Args: + path: The path to collect (this may be a file, directory or symlink). + outpath: A posix style explicit path where to store the collected path. In case ``path`` + is a directory this will be the new base directory. It is concatenated with + ``base`` to get the final output path. Windows device path and sysvol parts are + normalized. When set, intermediate symlinks of ``path`` are not collected. When + not set, it will be constructed from the given ``path``. + module_name: When set it indicates the module doing the collection, used for logging and + reporting. When not set the :class:``Collector``'s ``bound_module`` will be + used. + base: A different base path to use to store the file, it is prepended to the given or + generated ``outpath``. + volatile: When this flag is set, the collection of a number of artefacts is done + slightly different: + - symlinks at the end of a path will not be collected, + - empty directories will be collected, + - files will be collected in a slower but more robust way, any errors while + reading the bytes will not fail the collection of the file and all bytes + already retrieved will be stored. + seen_paths: A list of normalized path strings, used when calling this function + recursively to collect directories to break out of symlink loops. + """ module_name = self.bound_module_name or module_name if not module_name: raise ValueError("Module name must be provided or Collector needs to be bound to a module") @@ -305,98 +359,291 @@ def collect_file( if not isinstance(path, fsutil.TargetPath): path = self.target.fs.path(path) - if self.filter(path) is True: - log.info("- Collecting file %s: Skipped (filtered out)", path) - return + log.debug("- Collecting path %s", path) + # This dedup is a shortcut as when the normalized path and, optionally, its intermediary + # symlinks are collected, the orignal non-normalized path is also added to the report and + # dedup list. This prevents rerunning a number of normalizing steps to find out if the + # normalized version of the path should be deduplicated. if self.report.was_path_seen(path): - log.info("- Collecting file %s: Skipped (DEDUP)", path) + log.info("- Collecting path %s: Skipped (DEDUP)", path) return - outpath = self._output_path(outpath or path, base) + # If a path is used in any of the report.add_path_*() functions, it is used for + # deduping. In case of errors and depending on the processing stage, the path that + # resulted in these errors changes. + error_path = path try: - entry = path.get() - if volatile: - self.output.write_volatile(outpath, entry, size=size) + if outpath: + # If an outpath is explicitly provided, there is no use to store any of the + # intermediate symlinks to the original path. + collect_inpath = normalize_path(self.target, path, resolve_parents=True, preserve_case=True) else: - self.output.write_entry(outpath, entry, size=size) + # If there is no explicit outpath, the branch collection will resolve the parents. + # ONLY REPLACE device root, sysvol & path seps. + collect_inpath = normalize_path(self.target, path, resolve_parents=False, preserve_case=True) + collect_inpath = self.target.fs.path(collect_inpath) + + error_path = collect_inpath + + # For breaking out of symlink loops and skipping files from the skip_list we need a + # fully normalized path except for resolving the final part. + # RESOLVE parents, REPLACE device root, sysvol, path seps & casing + os_clean_path = normalize_path(self.target, path, resolve_parents=True, preserve_case=False) + + # If direct_collect is True, it indicates collect_path() was not called recursively. + # This is useful info to log errors in case a directory was tried to collect but it was + # empty. + direct_collect = False + + if seen_paths is None: + seen_paths = set() + direct_collect = True + elif os_clean_path in seen_paths: + self.report.add_path_failed(module_name, path) + log.error("- Skipping collection of %s, breaking out of symlink loop", path) + return - self.report.add_file_collected(module_name, path) - result = "OK" - except FileNotFoundError: - self.report.add_file_missing(module_name, path) - result = "File not found" - except Exception as exc: - log.error("Failed to collect file", exc_info=True) - self.report.add_file_failed(module_name, path) - result = repr(exc) + seen_paths.add(os_clean_path) - log.info("- Collecting file %s: %s", path, result) + if self.skip_list and os_clean_path in self.skip_list: + self.report.add_path_failed(module_name, path) + log.info("- Skipping collection of %s, path is on the skip list", path) + return - def collect_symlink(self, path: fsutil.TargetPath, module_name: Optional[str] = None) -> None: - try: - outpath = self._output_path(path) - self.output.write_entry(outpath, path.get()) + # If a path does not exist, is_dir(), is_file() and is_symlink() will return False (and + # not raise an exception), so we need to explicitly trigger an exception for this using + # collect_inpath.lstat(). + path_entry = collect_inpath.lstat() + is_dir = collect_inpath.is_dir() + is_file = collect_inpath.is_file() + is_symlink = collect_inpath.is_symlink() + + branches = [] + if not outpath: + collect_inpath, branches = self._get_symlink_branches(collect_inpath) + + # If the collect_inpath and branches resulting from path are all skipped due to deduping, + # we don't want to report success of collecting path. + all_deduped = True + if self.report.was_path_seen(collect_inpath): + # The collect_inpath is skipped, but any symlink branches will still be collected, + # as we may not have collected this file through the specific symlinks set in path. + log.info("- Collecting path %s: Skipped (DEDUP)", collect_inpath) + + elif self.filter(collect_inpath): + log.info("- Collecting path %s: Skipped (filtered out)", collect_inpath) + # No need to collect the symlink branches, as they would point to nowhere. + return - self.report.add_symlink_collected(module_name, path) - result = "OK" - except Exception as exc: - self.report.add_symlink_failed(module_name, path) - log.error("Failed to collect symlink %s (-> %s)", path, path.readlink(), exc_info=True) - result = repr(exc) + else: + all_deduped = False + collect_outpath = self._output_path(outpath or collect_inpath, base) + + if is_symlink: + log.info("- Collecting symlink %s to: %s", collect_inpath, collect_outpath) + self.output.write_entry(collect_outpath, path_entry) + self.report.add_symlink_collected(module_name, collect_inpath) + log.info("- Collecting symlink %s succeeded", collect_inpath) + + if not volatile: + self.collect_path( + collect_inpath.resolve(), + # If explicitly provided, the symlink itself was already saved as outpath, where it + # links to wil be saved under its own name. + outpath=None, + module_name=module_name, + base=base, + volatile=volatile, + seen_paths=seen_paths, + ) + + elif is_dir: + dir_is_empty = True + for entry in collect_inpath.iterdir(): + dir_is_empty = False + + # If an explicit outpath was provided, we store all entries on top of the provided + # outpath. + if outpath: + outpath = fsutil.join(outpath, entry.name) + + self.collect_path( + entry, + outpath=outpath, + module_name=module_name, + base=base, + volatile=volatile, + seen_paths=seen_paths, + ) + + if dir_is_empty: + if direct_collect and not volatile: + self.report.add_dir_failed(module_name, collect_inpath) + log.error("- Failed to collect directory %s, it is empty", collect_inpath) + return + + if volatile: + log.info("- Collecting EMPTY directory %s to: %s", collect_inpath, collect_outpath) + self.output.write_entry(collect_outpath, collect_inpath) + self.report.add_dir_collected(module_name, collect_inpath) + log.info("- Collecting EMPTY directory %s succeeded", collect_inpath) + + elif is_file: + log.info("- Collecting file %s to: %s", collect_inpath, collect_outpath) + if volatile: + self.output.write_volatile(collect_outpath, path_entry) + else: + self.output.write_entry(collect_outpath, path_entry) + self.report.add_file_collected(module_name, collect_inpath) + log.info("- Collecting file %s succeeded", collect_inpath) - log.info("- Collecting symlink %s: %s", path, result) + else: + self.report.add_path_failed(module_name, path) + log.error("- Don't know how to collect %s in module %s", path, module_name) + return + + # All branches are symlinks, collect them as such. If an explicit outpath is set, the list of + # branches will be empty. + for branch_path in branches: + log.info("- Collecting symlink branch path %s", branch_path) + error_path = branch_path + if self.report.was_path_seen(branch_path): + log.info("- Collecting symlink branch path %s: Skipped (DEDUP)", branch_path) + else: + all_deduped = False + outpath = self._output_path(branch_path, base) + self.output.write_entry(outpath, branch_path.get()) + self.report.add_symlink_collected(module_name, branch_path) + log.info("- Collecting symlink branch suceeded %s", branch_path) - def collect_dir( + except OSError as error: + if error.errno == errno.ENOENT: + self.report.add_path_missing(module_name, error_path) + log.error("- Path %s is not found (while collecting %s)", error_path, path) + elif error.errno == errno.EACCES: + self.report.add_path_failed(module_name, error_path) + log.error("- Permission denied while accessing path %s (while collecting %s)", error_path, path) + else: + self.report.add_path_failed(module_name, error_path) + log.error("- OSError while collecting path %s (while collecting %s)", error_path, path) + except (FileNotFoundError, NotADirectoryError, NotASymlinkError, SymlinkRecursionError, ValueError): + self.report.add_path_missing(module_name, error_path) + log.error("- Path %s is not found (while collecting %s)", error_path, path) + except Exception: + self.report.add_path_failed(module_name, error_path) + log.error("- Failed to collect path %s (while collecting %s)", error_path, path, exc_info=True) + else: + if not all_deduped and collect_inpath != path: + self.report.add_path_collected(module_name, path) + log.debug("- Collecting path %s succeeded", path) + + def collect_file_raw( self, - path: Union[str, fsutil.TargetPath], - seen_paths: Optional[set] = None, + path: str | fsutil.TargetPath, + fs: Filesystem, + mountpoint: str, + outpath: Optional[str] = None, module_name: Optional[str] = None, - follow: bool = True, - volatile: bool = False, + base: Optional[str] = None, + file_accessor: Optional[Callable[[BinaryIO, int], BinaryIO]] = None, ) -> None: + """Collect a single file from one of the target's filesystems. + + Args: + path: The path to the file to collect. This path will be fully resolved before + collecting and construction of the output path. + fs: The filesystem to collect the path from. + mountpoint: The (possibly fake) mountpoint of the given filesystem, to make the path + unique within the target. If ``outpath`` is not supplied it will be + concatenated with ``path`` and ``base`` to construct the ``outpath``. + outpath: A posix style explicit path where to store the collected file. It is + concatenated with ``base`` to get the final output path. Windows device path + and sysvol parts are normalized. When not set, it will be constructed from the + given ``path``. + module_name: When set it indicates the module doing the collection, used for logging and + reporting. When not set the ``Collector``'s ``bound_module`` will be used. + base: A different base path to use to store the file, it is prepended to the given or + generated ``outpath``. + file_accessor: + """ module_name = self.bound_module_name or module_name if not module_name: raise ValueError("Module name must be provided or Collector needs to be bound to a module") if not isinstance(path, fsutil.TargetPath): - path = self.target.fs.path(path) + path = fs.path(path) - log.info("- Collecting directory %s", path) + # As path is not unique on the target, collect_inpath is constructed to be unique a (but fake) + # path. The actual file entry collected comes from path, collect_inpath is used for logging, + # reporting and deduplication purposes. This needs to be set here to be able to log and + # deduplicate in case exceptions are raised early on. + collect_inpath = fsutil.join(mountpoint, path.as_posix().lstrip("/")) - seen_paths = seen_paths or set() try: - resolved = path.resolve() - if resolved in seen_paths: - log.debug("Breaking out of symlink loop: path %s linking to %s", path, resolved) + # As we don't collect any intermediate or end symlinks, the path needs to be fully + # resolved. + path = path.resolve() + + if self.filter(path): + log.info("- Collecting path %s: Skipped (filtered out)", collect_inpath) + return + + # In general normalization will not do much as the path is already fully resolved and + # passed in posix form. Also files on non-root filesystems generally don't have any + # drive path or driveletter part. + collect_inpath = normalize_path(self.target, path.as_posix(), resolve_parents=False, preserve_case=True) + if mountpoint: + collect_inpath = fsutil.join(mountpoint, collect_inpath.lstrip("/")) + + if self.report.was_path_seen(collect_inpath): + log.info("- Collecting path %s (%s on %s): Skipped (DEDUP)", collect_inpath, path, fs) + return + + entry = path.get() + + if not path.is_file(): + log.error("- Failed to collect path %s (%s on %s): not a file", collect_inpath, path, fs) + self.report.add_file_failed(module_name, collect_inpath) return - seen_paths.add(resolved) - dir_is_empty = True - for entry in path.iterdir(): - dir_is_empty = False - self.collect_path( - entry, seen_paths=seen_paths, module_name=module_name, follow=follow, volatile=volatile - ) + log.info("- Collecting file %s (%s on %s)", collect_inpath, path, fs) + + collect_outpath = self._output_path(outpath or collect_inpath, base=base) - if dir_is_empty and volatile: - outpath = self._output_path(path) - self.output.write_entry(outpath, path) + fh = entry.open() + if file_accessor is not None: + fh, size = file_accessor(fh) + else: + size = fh.size + + self.output.write( + collect_outpath, + fh, + entry, + size=size, + ) except OSError as error: if error.errno == errno.ENOENT: - self.report.add_dir_missing(module_name, path) - log.error("- Directory %s is not found", path) + self.report.add_file_missing(module_name, collect_inpath) + log.error("- File %s (%s on %s) is not found", collect_inpath, path, fs) elif error.errno == errno.EACCES: - self.report.add_dir_failed(module_name, path) - log.error("- Permission denied while accessing directory %s", path) + self.report.add_file_failed(module_name, collect_inpath) + log.error("- Permission denied while accessing file %s (%s on %s)", collect_inpath, path, fs) else: - self.report.add_dir_failed(module_name, path) - log.error("- OSError while collecting directory %s (%s)", path, error) + self.report.add_file_failed(module_name, collect_inpath) + log.error("- OSError while collecting file %s (%s on %s)", collect_inpath, path, fs) + except (FileNotFoundError, NotADirectoryError, NotASymlinkError, SymlinkRecursionError, ValueError): + self.report.add_file_missing(module_name, collect_inpath) + log.error("- File %s (%s on %s) not found", collect_inpath, path, fs) except Exception: - self.report.add_dir_failed(module_name, path) - log.error("- Failed to collect directory %s", path, exc_info=True) + self.report.add_file_failed(module_name, collect_inpath) + log.error("- Failed to collect file %s (%s on %s)", collect_inpath, path, fs, exc_info=True) + else: + self.report.add_file_collected(module_name, collect_inpath) + log.info("- Collecting file %s (%s on %s) succeeded", collect_inpath, path, fs) def collect_glob(self, pattern: str, module_name: Optional[str] = None) -> None: module_name = self.bound_module_name or module_name @@ -415,72 +662,10 @@ def collect_glob(self, pattern: str, module_name: Optional[str] = None) -> None: else: if glob_is_empty: self.report.add_glob_empty(module_name, pattern) + log.error("- Failed to collect glob %s, it is empty", pattern) else: log.info("- Collecting glob %s succeeded", pattern) - def collect_path( - self, - path: Union[str, fsutil.TargetPath], - seen_paths: Optional[set] = None, - module_name: Optional[str] = None, - follow: bool = True, - volatile: bool = False, - ) -> None: - module_name = self.bound_module_name or module_name - if not module_name: - raise ValueError("Module name must be provided or Collector needs to be bound to a module") - - if not isinstance(path, fsutil.TargetPath): - path = self.target.fs.path(path) - - if self.skip_list and normalize_path(self.target, path) in self.skip_list: - self.report.add_path_failed(module_name, path) - log.error("- Skipping collection of %s, path is on the skip list", path) - return - - try: - # If a path does not exist, is_dir(), is_file() and is_symlink() will return False (and not raise an - # exception), so we need to explicitly trigger an exception for this using path.get(). - path.get() - is_dir = path.is_dir() - is_file = path.is_file() - is_symlink = path.is_symlink() - except OSError as error: - if error.errno == errno.ENOENT: - self.report.add_path_missing(module_name, path) - log.error("- Path %s is not found", path) - elif error.errno == errno.EACCES: - self.report.add_path_failed(module_name, path) - log.error("- Permission denied while accessing path %s", path) - else: - self.report.add_path_failed(module_name, path) - log.error("- OSError while collecting path %s", path) - return - except (FileNotFoundError, NotADirectoryError, NotASymlinkError, SymlinkRecursionError, ValueError): - self.report.add_path_missing(module_name, path) - log.error("- Path %s is not found", path) - return - except Exception: - self.report.add_path_failed(module_name, path) - log.error("- Failed to collect path %s", path, exc_info=True) - return - - if is_symlink: - self.collect_symlink(path, module_name=module_name) - - if follow: - # Follow the symlink, call ourself again with the resolved path - self.collect_path( - path.resolve(), seen_paths=seen_paths, module_name=module_name, follow=follow, volatile=volatile - ) - elif is_dir: - self.collect_dir(path, seen_paths=seen_paths, module_name=module_name, follow=follow, volatile=volatile) - elif is_file: - self.collect_file(path, module_name=module_name, volatile=volatile) - else: - self.report.add_path_failed(module_name, path) - log.error("- Don't know how to collect %s in module %s", path, module_name) - def collect_command_output( self, command_parts: list[str], @@ -503,6 +688,7 @@ def collect_command_output( self.report.add_command_failed(module_name, command_parts) log.error("- Failed to collect output from command `%s`", " ".join(command_parts), exc_info=True) return + log.info("- Collecting output from command `%s` succeeded", " ".join(command_parts)) def write_bytes(self, destination_path: str, data: bytes) -> None: self.output.write_bytes(destination_path, data) diff --git a/acquire/utils.py b/acquire/utils.py index 325924dc..eccc0eef 100644 --- a/acquire/utils.py +++ b/acquire/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import argparse import ctypes import datetime @@ -383,25 +385,37 @@ def persist_execution_report(path: Path, report_data: dict) -> Path: DEVICE_SUBST = re.compile(r"^(/\?\?/)") -SYSVOL_SUBST = re.compile(r"^/?sysvol(?=/)", flags=re.IGNORECASE) +SYSVOL_SUBST = re.compile(r"^/?sysvol(?=/|$)", flags=re.IGNORECASE) + +SYSVOL_UPPER_SUBST = re.compile(r"^(/?SYSVOL)(?=/|$)") +DRIVE_LOWER_SUBST = re.compile(r"^(/?[a-z]:)(?=/|$)") -def normalize_path(target: Target, path: Path, *, resolve: bool = False, lower_case: bool = True) -> str: - if resolve: - path = path.resolve() +def normalize_path( + target: Target, + path: str | Path, + resolve_parents: bool = False, + preserve_case: bool = True, +) -> str: + if isinstance(path, Path): + if resolve_parents: + path = path.parent.resolve().joinpath(path.name) - path = path.as_posix() + path = path.as_posix() if target.os == "windows": path = DEVICE_SUBST.sub("", path) if sysvol_drive := target.props.get("sysvol_drive"): - path = normalize_sysvol(path, sysvol_drive) + path = SYSVOL_SUBST.sub(sysvol_drive, path) + + # The substitutions below are temporary until we have proper full path name uniformization + # for case insensitive filesystems. + # Replace any uppercase SYSVOL path, with a lowercase version. + path = SYSVOL_UPPER_SUBST.sub(lambda pat: pat.group(1).lower(), path) + # Replace any lower case driveletter path with an uppercase version. + path = DRIVE_LOWER_SUBST.sub(lambda pat: pat.group(1).upper(), path) - if not target.fs.case_sensitive and lower_case: + if not target.fs.case_sensitive and not preserve_case: path = path.lower() return path - - -def normalize_sysvol(path: str, sysvol: str) -> str: - return SYSVOL_SUBST.sub(sysvol, path) diff --git a/tests/conftest.py b/tests/conftest.py index acbb894e..b9d11a8e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,6 +19,10 @@ def mock_fs(mock_file: BinaryIO) -> VirtualFilesystem: fs.map_file_entry("/foo/bar/own-file", VirtualFile(fs, "own-file", mock_file)) fs.map_file_entry("/foo/bar/some-symlink", VirtualSymlink(fs, "some-symlink", "/foo/bar/some-file")) fs.map_file_entry("/foo/own-symlink", VirtualSymlink(fs, "own-symlink", "/foo/bar/own-file")) + + fs.map_file_entry("/symlink/dir1", VirtualSymlink(fs, "dir1", "/symlink/dir2")) + fs.map_file_entry("/symlink/dir2/some-dir", VirtualSymlink(fs, "some-dir", "/symlink/dir3/some-dir")) + fs.map_file_entry("/symlink/dir3/some-dir/some-file", VirtualFile(fs, "some-file", mock_file)) return fs diff --git a/tests/test_collector.py b/tests/test_collector.py index 67d84a4d..05652b4f 100644 --- a/tests/test_collector.py +++ b/tests/test_collector.py @@ -1,6 +1,6 @@ import errno -from pathlib import Path -from unittest.mock import MagicMock, Mock, patch +from typing import Optional +from unittest.mock import MagicMock, patch import pytest from dissect.target import Target @@ -10,30 +10,9 @@ NotASymlinkError, SymlinkRecursionError, ) -from dissect.target.filesystem import VirtualFilesystem +from dissect.target.helpers.fsutil import TargetPath -from acquire.collector import CollectionReport, Collector, Outcome -from acquire.outputs.base import Output - - -def test_collector(mock_target: Target) -> None: - with patch("acquire.collector.log", autospec=True) as mock_log: - fs_1 = VirtualFilesystem() - fs_1.map_file("$MFT", None) - mock_target.fs.mount("C:", fs_1) - mock_target.filesystems.add(fs_1) - - fs_2 = VirtualFilesystem() - fs_2.map_file("$MFT", None) - mock_target.fs.mount("D:", fs_2) - mock_target.filesystems.add(fs_2) - - collector = Collector(mock_target, Mock()) - - collector.collect_dir("C:", module_name="test") - collector.collect_dir("D:", module_name="test") - - assert not mock_log.info.call_args.args[0] == "- Collecting file %s: Skipped (DEDUP)" +from acquire.collector import ArtifactType, CollectionReport, Collector @pytest.fixture @@ -47,168 +26,404 @@ def mock_collector(mock_target: Target) -> Collector: MOCK_MODULE_NAME = "DUMMY" -def test_collector_collect_path_no_module_name(mock_collector: Collector) -> None: - with pytest.raises(ValueError): - mock_collector.collect_path("/some/path") +@pytest.mark.parametrize( + "path_str, expected", + [ + ("some/path", "/some/path"), + ("/some/path", "/some/path"), + ("some/path/", "/some/path/"), + ], +) +def test_collection_report__uniq_path(mock_target: Target, path_str: str, expected: str) -> None: + path = mock_target.fs.path(path_str) + with patch("acquire.collector.normalize_path", return_value=path_str, autospec=True): + test_report = CollectionReport(mock_target) + assert test_report._uniq_path(path_str) == expected + assert test_report._uniq_path(path) == expected -def test_collector_collect_path_dir_as_target_path(mock_target: Target, mock_collector: Collector) -> None: - with patch.object(mock_collector, "collect_dir", autospec=True): - path = mock_target.fs.path("/foo/bar") - mock_collector.collect_path( - path, - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) - mock_collector.collect_dir.assert_called() +def test_collector_collect_no_module_name(mock_collector: Collector) -> None: + with pytest.raises(ValueError): + mock_collector.collect([[ArtifactType.PATH, "/some/path"]]) -def test_collector_collect_path_dir(mock_collector: Collector) -> None: - with patch.object(mock_collector, "collect_dir", autospec=True): - mock_collector.collect_path( - "/foo/bar", - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) - mock_collector.collect_dir.assert_called() +def test_collector_collect_invalid_artifact_type(mock_collector: Collector) -> None: + with pytest.raises(ValueError): + mock_collector.collect([["dummy_type", "/some/path"]], MOCK_MODULE_NAME) -def test_collector_collect_path_file(mock_collector: Collector) -> None: - with patch.object(mock_collector, "collect_file", autospec=True): - mock_collector.collect_path( - "/foo/bar/some-file", - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) - mock_collector.collect_file.assert_called() +def test_collector_collect_transform_func(mock_collector: Collector) -> None: + mock_transform = MagicMock() + with patch.object(mock_collector, "collect_path", autospec=True): + mock_collector.collect([[ArtifactType.PATH, "/some/path", mock_transform]], MOCK_MODULE_NAME) -def test_collector_collect_path_symlink(mock_collector: Collector) -> None: - with patch.object(mock_collector, "collect_symlink", autospec=True), patch.object( - mock_collector, "collect_file", autospec=True - ): - mock_collector.collect_path( - "/foo/bar/some-symlink", - follow=False, - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) - mock_collector.collect_symlink.assert_called() - mock_collector.collect_file.assert_not_called() + assert mock_transform.call_args.args == (mock_collector.target, "/some/path") -def test_collector_collect_path_symlink_follow(mock_collector: Collector) -> None: - with patch.object(mock_collector, "collect_symlink", autospec=True), patch.object( - mock_collector, "collect_file", autospec=True +@pytest.mark.parametrize( + "spec, collect_func", + [ + ([ArtifactType.FILE, "/some/path"], "collect_path"), + ([ArtifactType.DIR, "/some/path"], "collect_path"), + ([ArtifactType.SYMLINK, "/some/path"], "collect_path"), + ([ArtifactType.PATH, "/some/path"], "collect_path"), + ([ArtifactType.GLOB, "/some/glob*"], "collect_glob"), + ([ArtifactType.COMMAND, [["./some", "--command"], "output_file"]], "collect_command_output"), + ], +) +def test_collector_collect(mock_collector: Collector, spec, collect_func) -> None: + with ( + patch.object(mock_collector, "collect_path", autospec=True), + patch.object(mock_collector, "collect_glob", autospec=True), + patch.object(mock_collector, "collect_command_output", autospec=True), ): - mock_collector.collect_path( - "/foo/bar/some-symlink", - follow=True, - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) - mock_collector.collect_symlink.assert_called() - mock_collector.collect_file.assert_called() + mock_collector.collect([spec], MOCK_MODULE_NAME) + + called_func = getattr(mock_collector, collect_func) + + if spec[0] == ArtifactType.COMMAND: + assert called_func.call_args.args[0] == spec[1][0] + assert called_func.call_args.args[1] == spec[1][1] + else: + assert called_func.call_args.args[0] == spec[1] + + assert called_func.call_args.kwargs["module_name"] == MOCK_MODULE_NAME @pytest.mark.parametrize( - "path, symlink_called, file_called", + "path_str, base, expected", [ - ( - "/foo/bar/own-file", - False, - False, - ), - ( - "/foo/own-symlink", - True, - False, - ), - ( - "/foo/bar/some-file", - False, - True, - ), - ( - "/foo/bar/some-symlink", - True, - True, - ), + ("/some/path", None, "fs/some/path"), + ("some/path", None, "fs/some/path"), + ("/some/path/", None, "fs/some/path/"), + ("/some/path", "/bar/", "bar/some/path"), + ("some/path", "/bar/", "bar/some/path"), + ("/some/path/", "/bar/", "bar/some/path/"), + ], +) +def test_collector__output_path( + mock_target: Target, + mock_collector: Collector, + path_str: str, + base: Optional[str], + expected: str, +) -> None: + path = mock_target.fs.path(path_str) + + with patch("acquire.collector.normalize_path", return_value=path_str, autospec=True): + assert mock_collector._output_path(path_str, base=base) == expected + assert mock_collector._output_path(path, base=base) == expected + + +def test_collector__get_symlink_branches(mock_target: Target, mock_collector: Collector) -> None: + path = mock_target.fs.path("/symlink/dir1/some-dir/some-file") + path, branches = mock_collector._get_symlink_branches(path) + + assert path == mock_target.fs.path("/symlink/dir3/some-dir/some-file") + assert branches == [ + mock_target.fs.path("/symlink/dir1"), + mock_target.fs.path("/symlink/dir2/some-dir"), + ] + + +def test_collector_collect_path_no_module_name(mock_collector: Collector) -> None: + with pytest.raises(ValueError): + mock_collector.collect_path("/some/path") + + +@pytest.mark.parametrize( + "outpath, base, volatile, as_targetpath", + [ + (None, None, False, True), + (None, None, False, False), + (None, None, True, True), + ("/some/other/path", None, False, True), + ("/some/other/path", "/my/base", False, True), + ("/some/other/path", None, True, True), + ("/some/other/path", "/my/base", True, True), ], ) -def test_collector_collect_path_skip_list( - mock_collector: Collector, path: str, symlink_called: bool, file_called: bool +def test_collector_collect_path_with_file( + mock_target: Target, + mock_collector: Collector, + outpath: str, + base: str, + volatile: bool, + as_targetpath: bool, ) -> None: + # We use a path that does not need to be modified by the normalize_path() + # function, so we can easily use it to check if it was properly used in the + # writer function call. + path_str = "/foo/bar/some-file" + path = mock_target.fs.path(path_str) + collect_path = path if as_targetpath else path_str + writer = { + False: "write_entry", + True: "write_volatile", + } + with ( - patch.object(mock_collector, "skip_list", new={"/foo/bar/own-file"}), - patch.object(mock_collector, "collect_symlink", autospec=True), - patch.object(mock_collector, "collect_file", autospec=True), + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_report, ): mock_collector.collect_path( - path, - follow=True, - seen_paths=MOCK_SEEN_PATHS, + collect_path, + outpath=outpath, module_name=MOCK_MODULE_NAME, + base=base, + volatile=volatile, ) - if symlink_called: - mock_collector.collect_symlink.assert_called() - else: - mock_collector.collect_symlink.assert_not_called() - if file_called: - mock_collector.collect_file.assert_called() - else: - mock_collector.collect_file.assert_not_called() + outpath = mock_collector._output_path(outpath or path, base=base) + writer_func = getattr(mock_collector.output, writer.get(volatile)) + writer_func.assert_called_once() + assert writer_func.call_args.args[0] == outpath -def test_collector_collect_glob(mock_collector: Collector) -> None: - with patch.object(mock_collector, "collect_file", autospec=True), patch.object( - mock_collector, "report", autospec=True + assert mock_report.call_args.args == (MOCK_MODULE_NAME, path) + + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + call_args = ("- Collecting file %s succeeded", path) + assert call_args in info_log_call_args + + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", path) + + +def test_collector_collect_path_early_dedup(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/some-file" + path = mock_target.fs.path(path_str) + + with patch("acquire.collector.log", autospec=True) as mock_log: + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + mock_collector.output.write_entry.assert_called_once() + mock_collector.output.write_volatile.assert_not_called() + + assert mock_log.info.call_args.args == ("- Collecting path %s: Skipped (DEDUP)", path) + + +def test_collector_collect_path_early_dedup_mocked(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/some-file" + path = mock_target.fs.path(path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "was_path_seen", autospec=True, return_value=True), ): - mock_collector.collect_glob( - "/foo/bar/*", - module_name=MOCK_MODULE_NAME, - ) - assert len(mock_collector.collect_file.mock_calls) == 3 - assert mock_collector.collect_file.call_args.kwargs.get("module_name", None) == MOCK_MODULE_NAME + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + mock_collector.output.write_entry.assert_not_called() + mock_collector.output.write_volatile.assert_not_called() + assert mock_log.info.call_args.args == ("- Collecting path %s: Skipped (DEDUP)", path) + + +def test_collector_collect_path_in_seen_paths(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/some-file" + path = mock_target.fs.path(path_str) -def test_collector_collect_path_non_existing_file(mock_collector: Collector) -> None: with ( patch("acquire.collector.log", autospec=True) as mock_log, - patch.object(mock_collector, "report", autospec=True) as mock_report, + patch.object(mock_collector.report, "add_path_failed", autospec=True) as mock_report, ): - mock_collector.collect_path( - "/foo/bar/non-existing-file", - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) - mock_report.add_path_missing.assert_called() - mock_log.error.assert_called() - assert mock_log.error.call_args.args[0] == "- Path %s is not found" + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME, seen_paths={path_str}) + + mock_collector.output.write_entry.assert_not_called() + mock_collector.output.write_volatile.assert_not_called() + + assert mock_report.call_args.args == (MOCK_MODULE_NAME, path) + + assert mock_log.error.call_args.args == ("- Skipping collection of %s, breaking out of symlink loop", path) + + +def test_collector_collect_path_in_skiplist(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/some-file" + path = mock_target.fs.path(path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_path_failed", autospec=True) as mock_report, + patch.object(mock_collector, "skip_list", new=[path_str]), + ): + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + mock_collector.output.write_entry.assert_not_called() + mock_collector.output.write_volatile.assert_not_called() + assert mock_report.call_args.args == (MOCK_MODULE_NAME, path) + + assert mock_log.info.call_args.args == ("- Skipping collection of %s, path is on the skip list", path) + + +def test_collector_collect_path_with_filter(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/some-file" + path = mock_target.fs.path(path_str) -def test_collector_collect_path_no_file_type(mock_target: Target, mock_collector: Collector) -> None: - path = mock_target.fs.path("/foo/bar/non-existing-file") with ( patch("acquire.collector.log", autospec=True) as mock_log, - patch.object(mock_collector, "report", autospec=True) as mock_report, + patch.object(mock_collector, "filter", return_value=True, autospec=True), + ): + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + mock_collector.output.write_entry.assert_not_called() + mock_collector.output.write_volatile.assert_not_called() + + assert mock_log.info.call_args.args == ("- Collecting path %s: Skipped (filtered out)", path) + + +def test_collector_collect_path_unknown_type(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/non-existing-file" + path = mock_target.fs.path(path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector, "report") as mock_report, + patch.object(mock_collector.report, "was_path_seen", return_value=False), + patch("acquire.collector.normalize_path", return_value=path_str, autospec=True), patch.multiple( - path, - get=MagicMock(return_value=True), + TargetPath, + lstat=MagicMock(return_value=True), is_dir=MagicMock(return_value=False), is_file=MagicMock(return_value=False), is_symlink=MagicMock(return_value=False), ), ): - mock_collector.collect_path( - path, - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) - mock_report.add_path_failed.assert_called() - mock_log.error.assert_called() - assert mock_log.error.call_args.args[0] == "- Don't know how to collect %s in module %s" + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + mock_collector.output.write_entry.assert_not_called() + mock_collector.output.write_volatile.assert_not_called() + + assert mock_report.add_path_failed.call_args.args == (MOCK_MODULE_NAME, path) + + assert mock_log.error.call_args.args == ("- Don't know how to collect %s in module %s", path, MOCK_MODULE_NAME) + + +def test_collector_collect_path_with_symlink_branches(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/symlink/dir1/some-dir/some-file" + path = mock_target.fs.path(path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_file_report, + patch.object(mock_collector.report, "add_symlink_collected", autospec=True) as mock_symlink_report, + ): + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + collect_paths = [ + mock_target.fs.path("/symlink/dir3/some-dir/some-file"), + mock_target.fs.path("/symlink/dir1"), + mock_target.fs.path("/symlink/dir2/some-dir"), + ] + + assert mock_collector.output.write_entry.call_count == 3 + for num, collect_path in enumerate(collect_paths): + outpath = mock_collector._output_path(collect_path) + assert mock_collector.output.write_entry.call_args_list[num].args[0] == outpath + + assert mock_file_report.call_args.args == (MOCK_MODULE_NAME, collect_paths[0]) + + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + call_args = ("- Collecting file %s succeeded", collect_paths[0]) + assert call_args in info_log_call_args + + for num, collect_path in enumerate(collect_paths[1:]): + assert mock_symlink_report.call_args_list[num].args == (MOCK_MODULE_NAME, collect_path) + + call_args = ("- Collecting symlink branch suceeded %s", collect_path) + assert call_args in info_log_call_args + + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", path) + + +def test_collector_collect_path_with_symlink_branches_and_outpath( + mock_target: Target, + mock_collector: Collector, +) -> None: + # When a path is collected with an explicit outpath, no symlink branches should be collected. + path_str = "/symlink/dir1/some-dir/some-file" + path = mock_target.fs.path(path_str) + outpath_str = "/some/other/path" + outpath = mock_collector._output_path(outpath_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_file_report, + patch.object(mock_collector.report, "add_symlink_collected", autospec=True) as mock_symlink_report, + ): + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME, outpath=outpath_str) + + mock_collector.output.write_entry.assert_called_once() + assert mock_collector.output.write_entry.call_args.args[0] == outpath + + mock_file_report.assert_called_once() + collect_path = mock_target.fs.path("/symlink/dir3/some-dir/some-file") + assert mock_file_report.call_args.args == (MOCK_MODULE_NAME, collect_path) + + mock_symlink_report.assert_not_called() + + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + call_args = ("- Collecting file %s succeeded", collect_path) + assert call_args in info_log_call_args + + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", path) + + +def test_collector_collect_path_late_dedup_mocked(mock_target: Target, mock_collector: Collector) -> None: + symlink_path_str = "/symlink/dir2/some-dir" + symlink_path = mock_target.fs.path(symlink_path_str) + collect_path = symlink_path / "some-file" + final_path_str = "/symlink/dir3/some-dir/some-file" + final_path = mock_target.fs.path(final_path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_file_report, + patch.object(mock_collector.report, "add_symlink_collected", autospec=True) as mock_symlink_report, + patch.object(mock_collector.report, "seen_paths", new={final_path_str}), + ): + mock_collector.collect_path(collect_path, module_name=MOCK_MODULE_NAME) + + outpath = mock_collector._output_path(symlink_path) + mock_collector.output.write_entry.assert_called_once() + assert mock_collector.output.write_entry.call_args.args[0] == outpath + + mock_file_report.assert_not_called() + assert mock_symlink_report.call_args.args == (MOCK_MODULE_NAME, symlink_path) + + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + call_args = ("- Collecting path %s: Skipped (DEDUP)", final_path) + assert call_args in info_log_call_args + + +def test_collector_collect_path_dedup_symlink_branch(mock_target: Target, mock_collector: Collector) -> None: + symlink_path_str = "/symlink/dir2/some-dir" + symlink_path = mock_target.fs.path(symlink_path_str) + collect_path = symlink_path / "some-file" + final_path_str = "/symlink/dir3/some-dir/some-file" + final_path = mock_target.fs.path(final_path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_file_report, + patch.object(mock_collector.report, "add_symlink_collected", autospec=True) as mock_symlink_report, + patch.object(mock_collector.report, "seen_paths", new={symlink_path_str}), + ): + mock_collector.collect_path(collect_path, module_name=MOCK_MODULE_NAME) + + outpath = mock_collector._output_path(final_path) + mock_collector.output.write_entry.assert_called_once() + assert mock_collector.output.write_entry.call_args.args[0] == outpath + + mock_symlink_report.assert_not_called() + assert mock_file_report.call_args.args == (MOCK_MODULE_NAME, final_path) + + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + call_args = ("- Collecting symlink branch path %s: Skipped (DEDUP)", symlink_path) + assert call_args in info_log_call_args + + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", collect_path) @pytest.mark.parametrize( @@ -217,176 +432,228 @@ def test_collector_collect_path_no_file_type(mock_target: Target, mock_collector ( "add_path_missing", OSError(errno.ENOENT, "foo"), - "- Path %s is not found", + "- Path %s is not found (while collecting %s)", ), ( "add_path_failed", OSError(errno.EACCES, "foo"), - "- Permission denied while accessing path %s", + "- Permission denied while accessing path %s (while collecting %s)", ), ( "add_path_failed", OSError(255, "foo"), - "- OSError while collecting path %s", + "- OSError while collecting path %s (while collecting %s)", ), ( "add_path_missing", FileNotFoundError, - "- Path %s is not found", + "- Path %s is not found (while collecting %s)", ), ( "add_path_missing", NotADirectoryError, - "- Path %s is not found", + "- Path %s is not found (while collecting %s)", ), ( "add_path_missing", NotASymlinkError, - "- Path %s is not found", + "- Path %s is not found (while collecting %s)", ), ( "add_path_missing", SymlinkRecursionError, - "- Path %s is not found", + "- Path %s is not found (while collecting %s)", ), ( "add_path_missing", ValueError, - "- Path %s is not found", + "- Path %s is not found (while collecting %s)", ), ( "add_path_failed", Exception, - "- Failed to collect path %s", + "- Failed to collect path %s (while collecting %s)", ), ], ) def test_collector_collect_path_with_exception( mock_target: Target, mock_collector: Collector, report_func: str, exception: type[Exception], log_msg: str ) -> None: - path = mock_target.fs.path("/foo/bar/non-existing-file") + path_str = "/foo/bar/non-existing-file" + path = mock_target.fs.path(path_str) with ( patch("acquire.collector.log", autospec=True) as mock_log, - patch.object(mock_collector, "report", autospec=True) as mock_report, - patch.object(path, "get", side_effect=exception, autospec=True), + patch.object(mock_collector, "report") as mock_report, + patch.object(mock_collector.report, "was_path_seen", return_value=False), + patch("acquire.collector.normalize_path", return_value=path_str, autospec=True), + patch.object(TargetPath, "get", side_effect=exception, autospec=True), ): - mock_collector.collect_path( - path, - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - ) + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + mock_collector.output.write_entry.assert_not_called() + mock_collector.output.write_volatile.assert_not_called() + report_func = getattr(mock_report, report_func) - report_func.assert_called() - mock_log.error.assert_called() - assert mock_log.error.call_args.args[0] == log_msg + assert report_func.call_args.args == (MOCK_MODULE_NAME, path) + mock_log.error.assert_called_once() + assert mock_log.error.call_args.args == (log_msg, path, path) -@pytest.mark.parametrize( - "path_name, volatile, collect_path_called, write_entry_called", - [ - ("/foo/bar/some-dir", False, False, False), - ("/foo/bar/some-dir", True, False, True), - ("/foo/bar", False, True, False), - ("foo/bar", True, True, False), - ], -) -def test_collector_collect_dir( + +def test_collector_collect_path_with_dir(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/" + path = mock_target.fs.path(path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_file_report, + patch.object(mock_collector.report, "add_symlink_collected", autospec=True) as mock_symlink_report, + ): + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME, seen_paths={path_str}) + + collect_paths = [ + mock_target.fs.path("/foo/bar/some-file"), + mock_target.fs.path("/foo/bar/own-file"), + mock_target.fs.path("/foo/bar/some-symlink"), + ] + + assert mock_collector.output.write_entry.call_count == 3 + write_call_args = {call.args[0] for call in mock_collector.output.write_entry.call_args_list} + for collect_path in collect_paths: + outpath = mock_collector._output_path(collect_path) + assert outpath in write_call_args + + file_report_call_args = {call.args for call in mock_file_report.call_args_list} + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + error_log_calls = {call_args.args for call_args in mock_log.error.call_args_list} + + for collect_path in collect_paths[:2]: + assert (MOCK_MODULE_NAME, collect_path) in file_report_call_args + + call_args = ("- Collecting file %s succeeded", collect_path) + assert call_args in info_log_call_args + + assert mock_symlink_report.call_args.args == (MOCK_MODULE_NAME, collect_paths[2]) + + assert ("- Collecting symlink %s succeeded", collect_paths[2]) in info_log_call_args + + # There is 1 empty subdirectory, but it should be silently skipped + empty_dir = mock_target.fs.path("/foo/bar/some-dir") + assert ("- Failed to collect directory %s, it is empty", empty_dir) not in error_log_calls + + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", path) + + +def test_collector_collect_path_with_empty_dir(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/some-dir/" + path = mock_target.fs.path(path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_dir_failed", autospec=True) as mock_report, + ): + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME) + + mock_collector.output.write_entry.assert_not_called() + mock_collector.output.write_volatile.assert_not_called() + + assert mock_report.call_args.args == (MOCK_MODULE_NAME, path) + + assert mock_log.error.call_args.args == ("- Failed to collect directory %s, it is empty", path) + + +def test_collector_collect_path_with_empty_dir_volatile(mock_target: Target, mock_collector: Collector) -> None: + path_str = "/foo/bar/some-dir/" + path = mock_target.fs.path(path_str) + + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_dir_collected", autospec=True) as mock_report, + ): + mock_collector.collect_path(path, module_name=MOCK_MODULE_NAME, volatile=True) + + outpath = mock_collector._output_path(path) + mock_collector.output.write_entry.assert_called_once() + assert mock_collector.output.write_entry.call_args.args[0] == outpath + + assert mock_report.call_args.args == (MOCK_MODULE_NAME, path) + + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + call_args = ("- Collecting EMPTY directory %s succeeded", path) + assert call_args in info_log_call_args + + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", path) + + +def test_collector_collect_path_with_symlink( mock_target: Target, mock_collector: Collector, - path_name: str, - volatile: bool, - collect_path_called: bool, - write_entry_called: bool, ) -> None: - path = mock_target.fs.path(path_name) - with patch.object(mock_collector, "collect_path", autospec=True): - mock_collector.collect_dir( - path, - seen_paths=MOCK_SEEN_PATHS, - module_name=MOCK_MODULE_NAME, - follow=False, - volatile=volatile, - ) - assert mock_collector.collect_path.called == collect_path_called - assert mock_collector.output.write_entry.called == write_entry_called + symlink_path_str = "/foo/bar/some-symlink" + symlink_path = mock_target.fs.path(symlink_path_str) + final_path_str = "/foo/bar/some-file" + final_path = mock_target.fs.path(final_path_str) + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_file_report, + patch.object(mock_collector.report, "add_symlink_collected", autospec=True) as mock_symlink_report, + ): + mock_collector.collect_path(symlink_path, module_name=MOCK_MODULE_NAME) -def create_temp_files(tmp_path: Path, paths: list[str]) -> None: - for path in paths: - creation_path = tmp_path.joinpath(path) - creation_path.parent.mkdir(parents=True, exist_ok=True) - creation_path.touch() + assert mock_collector.output.write_entry.call_count == 2 + write_call_args = {call.args[0] for call in mock_collector.output.write_entry.call_args_list} + for collect_path in [symlink_path, final_path]: + outpath = mock_collector._output_path(collect_path) + assert outpath in write_call_args + assert mock_symlink_report.call_args.args == (MOCK_MODULE_NAME, symlink_path) + assert mock_file_report.call_args.args == (MOCK_MODULE_NAME, final_path) -def collect_report( - collector: Collector, - function_name: str, - collect_point: Path, -) -> CollectionReport: - func = getattr(collector, f"collect_{function_name}") - func(collect_point, module_name=MOCK_MODULE_NAME) + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + symlink_call_args = ("- Collecting symlink %s succeeded", symlink_path) + file_call_args = ("- Collecting file %s succeeded", final_path) - return collector.report + assert symlink_call_args in info_log_call_args + assert file_call_args in info_log_call_args + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", symlink_path) -@pytest.mark.parametrize( - "function_name, collection_point, expected_results, create_paths", - [ - ( - "dir", - "collect", - 2, - ["collect/this/file", "collect/this/test"], - ), - ( - "glob", - "/collect/*/file", - 1, - ["collect/this/file"], - ), - ( - "glob", - "/collect/*/file", - 0, - [], - ), - ( - "file", - "collect/this/file", - 1, - ["collect/this/file"], - ), - ], -) -def test_collector_report_succeeded( - tmp_path: Path, + +def test_collector_collect_path_with_symlink_volatile( mock_target: Target, mock_collector: Collector, - function_name: str, - collection_point: str, - expected_results: int, - create_paths: list[str], ) -> None: - create_temp_files(tmp_path, create_paths) - fs = mock_target.filesystems[0] - fs.map_dir("/", tmp_path) - mock_target.fs.mount("/", fs) + symlink_path_str = "/foo/bar/some-symlink" + symlink_path = mock_target.fs.path(symlink_path_str) - report = collect_report(mock_collector, function_name, collection_point) - successful_outputs = list(value for value in report.registry if value.outcome == Outcome.SUCCESS) - assert len(successful_outputs) == expected_results + with ( + patch("acquire.collector.log", autospec=True) as mock_log, + patch.object(mock_collector.report, "add_file_collected", autospec=True) as mock_file_report, + patch.object(mock_collector.report, "add_symlink_collected", autospec=True) as mock_symlink_report, + ): + mock_collector.collect_path(symlink_path, module_name=MOCK_MODULE_NAME, volatile=True) + + assert mock_collector.output.write_entry.call_count == 1 + outpath = mock_collector._output_path(symlink_path) + assert mock_collector.output.write_entry.call_args.args[0] == outpath + assert mock_symlink_report.call_args.args == (MOCK_MODULE_NAME, symlink_path) + mock_file_report.assert_not_called() -def test_collector_sysvol_map() -> None: - vfs = VirtualFilesystem() + info_log_call_args = {call_args.args for call_args in mock_log.info.call_args_list} + symlink_call_args = ("- Collecting symlink %s succeeded", symlink_path) + assert symlink_call_args in info_log_call_args - mock_target = Mock() - mock_target.os = "windows" - mock_target.fs.mounts = {"sysvol": vfs, "y:": vfs} - mock_target.props = {"sysvol_drive": "y:"} + assert mock_log.debug.call_args.args == ("- Collecting path %s succeeded", symlink_path) - mock_output = Mock(spec=Output) - collector = Collector(mock_target, mock_output) +def test_collector_collect_glob(mock_collector: Collector) -> None: + with ( + patch.object(mock_collector, "collect_path", autospec=True), + patch.object(mock_collector, "report"), + ): + mock_collector.collect_glob("/foo/bar/*", module_name=MOCK_MODULE_NAME) - assert collector._output_path("sysvol/some/path") == "fs/y:/some/path" + assert mock_collector.collect_path.call_count == 4 + assert mock_collector.collect_path.call_args.kwargs.get("module_name", None) == MOCK_MODULE_NAME diff --git a/tests/test_utils.py b/tests/test_utils.py index f9650d81..85303dc0 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,11 +1,11 @@ import argparse -import platform -from pathlib import Path, PureWindowsPath +from pathlib import Path from typing import Optional from unittest.mock import MagicMock, patch import pytest from dissect.target import Target +from dissect.target.helpers.fsutil import TargetPath from acquire.acquire import MODULES, PROFILES, VOLATILE from acquire.utils import ( @@ -13,7 +13,6 @@ check_and_set_log_args, create_argument_parser, normalize_path, - normalize_sysvol, ) @@ -322,147 +321,208 @@ def test_check_and_set_acquire_args_cagent() -> None: @pytest.mark.parametrize( - "path, sysvol, resolve, lower_case, case_sensitive, os, result", + "path, resolve_parents, preserve_case, sysvol, os, result, as_path", [ ( - Path("/foo/bar"), - None, + "/foo/bar", False, - True, - True, + False, + None, "dummy", "/foo/bar", + True, ), ( - Path("/foo/BAR"), - None, + "/foo/BAR", False, - True, False, + None, "dummy", - "/foo/bar", + "/foo/BAR", + True, ), ( - Path("/foo/BAR"), - None, + "/foo/BAR", False, True, - True, + None, "dummy", "/foo/BAR", + True, ), ( - Path("/foo/../bar"), - None, + "/bla/../foo/bar", False, - True, - True, + False, + None, "dummy", - "/foo/../bar", + "/bla/../foo/bar", + True, ), ( - Path("/foo/../foo/bar"), - None, - True, - True, + "/bla/../foo/bar", True, + False, + None, "dummy", "/foo/bar", + True, ), ( - PureWindowsPath("c:\\foo\\bar"), - "c:", + "c:\\foo\\bar", False, - True, False, + "c:", "windows", "c:/foo/bar", + True, ), ( - PureWindowsPath("C:\\foo\\bar"), - "c:", + "C:\\foo\\bar", False, - True, False, + "c:", "windows", "c:/foo/bar", + True, ), ( - PureWindowsPath("\\??\\C:\\foo\\bar"), - "c:", + "\\??\\C:\\foo\\bar", False, - True, False, + "c:", "windows", "c:/foo/bar", + True, ), ( - PureWindowsPath("\\??\\c:\\foo\\bar"), - "c:", + "\\??\\c:\\foo\\bar", False, - True, False, + "c:", "windows", "c:/foo/bar", + True, ), ( - PureWindowsPath("D:\\foo\\bar"), - "c:", + "D:\\foo\\bar", False, - True, False, + "c:", "windows", "d:/foo/bar", + True, ), ( - PureWindowsPath("D:\\Foo\\BAR"), + "D:\\Foo\\BAR", + False, + True, "c:", + "windows", + "D:/Foo/BAR", + True, + ), + ( + "sysvol\\foo\\bar", False, + True, + "c:", + "windows", + "C:/foo/bar", + True, + ), + ( + "sysvol/foo/bar", False, + True, + None, + "dummy", + "sysvol/foo/bar", + True, + ), + ( + "/??/sysvol/foo/bar", False, + True, + None, + "dummy", + "/??/sysvol/foo/bar", + True, + ), + ( + "sysvol/Foo/../BAR", + True, + False, + "c:", "windows", - "D:/Foo/BAR", + "c:/bar", + True, + ), + ( + "sysvol/Foo/../BAR", + True, + True, + "c:", + "windows", + "C:/Foo/../BAR", + False, ), ( - PureWindowsPath("sysvol\\foo\\bar"), + "/??/sysvol/Foo/../BAR", + True, + False, "c:", + "windows", + "c:/foo/../bar", False, + ), + ( + "sysvol", False, + True, + "SYSVOL", + "windows", + "sysvol", False, + ), + ( + "a:", + False, + True, + "C:", "windows", - "c:/foo/bar", + "A:", + False, ), ], ) def test_utils_normalize_path( mock_target: Target, - path: Path, + path: str, + resolve_parents: bool, + preserve_case: bool, sysvol: Optional[str], - resolve: bool, - lower_case: bool, - case_sensitive: bool, os: str, result: str, + as_path: bool, ) -> None: + case_sensitive = True + if os == "windows": + case_sensitive = False + with patch.object(mock_target, "os", new=os), patch.object( mock_target.fs, "_case_sensitive", new=case_sensitive - ), patch.dict(mock_target.props, {"sysvol_drive": sysvol}): - resolved_path = normalize_path(mock_target, path, resolve=resolve, lower_case=lower_case) - - if platform.system() == "Windows": - # A resolved path on windows adds a C:\ prefix. So we check if it ends with our expected - # path string - assert resolved_path.endswith(result) - else: - assert resolved_path == result - - -@pytest.mark.parametrize( - "path, sysvol, result", - [ - ("sysvol/foo/bar", "c:", "c:/foo/bar"), - ("/sysvol/foo/bar", "c:", "c:/foo/bar"), - ], -) -def test_normalize_sysvol(path: str, sysvol: str, result: str) -> None: - assert normalize_sysvol(path, sysvol) == result + ), patch.object(mock_target.fs, "_alt_separator", new=("\\" if os == "windows" else "/")), patch.dict( + mock_target.props, {"sysvol_drive": sysvol} + ): + if as_path: + path = TargetPath(mock_target.fs, path) + + normalized_path = normalize_path( + mock_target, + path, + resolve_parents=resolve_parents, + preserve_case=preserve_case, + ) + + assert normalized_path == result