Skip to content

Commit

Permalink
Fix #181: Arbitrary file write during tarfile extraction (#182)
Browse files Browse the repository at this point in the history
* Fix #181: Arbitrary file write during tarfile extraction

* cleanup

* Add changie

* simplify method and add typing

* Add back members
  • Loading branch information
aranke authored Aug 19, 2024
1 parent 4b8a41e commit 35af654
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .changes/unreleased/Security-20240808-154439.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Security
body: Fix arbitrary file write during tarfile extraction
time: 2024-08-08T15:44:39.601346-05:00
custom:
Author: aranke
PR: "182"
28 changes: 27 additions & 1 deletion dbt_common/clients/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class FindMatchingParams:
root_path: str
relative_paths_to_search: List[str]
file_pattern: str

# ignore_spec: Optional[PathSpec] = None

def __init__(
Expand Down Expand Up @@ -608,11 +609,36 @@ def rename(from_path: str, to_path: str, force: bool = False) -> None:
shutil.move(from_path, to_path)


def safe_extract(tarball: tarfile.TarFile, path: str = ".") -> None:
"""
Fix for CWE-22: Improper Limitation of a Pathname to a Restricted Directory ('Path Traversal')
Solution copied from https://github.com/mindsdb/mindsdb/blob/main/mindsdb/utilities/fs.py
"""

def _is_within_directory(directory, target):
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
prefix = os.path.commonprefix([abs_directory, abs_target])
return prefix == abs_directory

# for py >= 3.12
if hasattr(tarball, "data_filter"):
tarball.extractall(path, filter="data")
else:
members = tarball.getmembers()
for member in members:
member_path = os.path.join(path, member.name)
if not _is_within_directory(path, member_path):
raise tarfile.OutsideDestinationError(member, path)

tarball.extractall(path, members=members)


def untar_package(tar_path: str, dest_dir: str, rename_to: Optional[str] = None) -> None:
tar_path = convert_path(tar_path)
tar_dir_name = None
with tarfile.open(tar_path, "r:gz") as tarball:
tarball.extractall(dest_dir)
safe_extract(tarball, dest_dir)
tar_dir_name = os.path.commonprefix(tarball.getnames())
if rename_to:
downloaded_path = os.path.join(dest_dir, tar_dir_name)
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/test_system_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,18 @@ def test_untar_package_empty(self) -> None:
with self.assertRaises(tarfile.ReadError) as exc:
dbt_common.clients.system.untar_package(named_file.name, self.tempdest)
self.assertEqual("empty file", str(exc.exception))

def test_untar_package_outside_directory(self) -> None:
with NamedTemporaryFile(
prefix="my-package.2", suffix=".tar.gz", dir=self.tempdir, delete=False
) as named_tar_file:
tar_file_full_path = named_tar_file.name
with NamedTemporaryFile(prefix="a", suffix=".txt", dir=self.tempdir) as file_a:
file_a.write(b"some text in the text file")
relative_file_a = "/../" + os.path.basename(file_a.name)
with tarfile.open(fileobj=named_tar_file, mode="w:gz") as tar:
tar.addfile(tarfile.TarInfo(relative_file_a), open(file_a.name))

assert tarfile.is_tarfile(tar.name)
with self.assertRaises(tarfile.OutsideDestinationError):
dbt_common.clients.system.untar_package(tar_file_full_path, self.tempdest)

0 comments on commit 35af654

Please sign in to comment.