Skip to content

Commit

Permalink
Merge pull request #551 from arosen93/path
Browse files Browse the repository at this point in the history
Allow pathlib.Path types in monty.shutil
  • Loading branch information
shyuep authored Sep 5, 2023
2 parents 2751267 + 5015207 commit b7d340f
Showing 1 changed file with 47 additions and 40 deletions.
87 changes: 47 additions & 40 deletions monty/shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,95 +5,100 @@
import shutil
import warnings
from gzip import GzipFile
from pathlib import Path
from typing import Literal

from .io import zopen


def copy_r(src, dst):
def copy_r(src: str | Path, dst: str | Path) -> None:
"""
Implements a recursive copy function similar to Unix's "cp -r" command.
Surprisingly, python does not have a real equivalent. shutil.copytree
only works if the destination directory is not present.
Args:
src (str): Source folder to copy.
dst (str): Destination folder.
src (str | Path): Source folder to copy.
dst (str | Path): Destination folder.
"""
abssrc = os.path.abspath(src)
absdst = os.path.abspath(dst)
try:
os.makedirs(absdst)
except OSError:
# If absdst exists, an OSError is raised. We ignore this error.
pass
src = Path(src)
dst = Path(dst)
abssrc = src.resolve()
absdst = dst.resolve()
os.makedirs(absdst, exist_ok=True)
for f in os.listdir(abssrc):
fpath = os.path.join(abssrc, f)
if os.path.isfile(fpath):
fpath = Path(abssrc, f)
if Path(fpath).is_file():
shutil.copy(fpath, absdst)
elif not absdst.startswith(fpath):
copy_r(fpath, os.path.join(absdst, f))
elif str(fpath) not in str(absdst):
copy_r(fpath, Path(absdst, f))
else:
warnings.warn(f"Cannot copy {fpath} to itself")


def gzip_dir(path, compresslevel=6):
def gzip_dir(path: str | Path, compresslevel: int = 6) -> None:
"""
Gzips all files in a directory. Note that this is different from
shutil.make_archive, which creates a tar archive. The aim of this method
is to create gzipped files that can still be read using common Unix-style
commands like zless or zcat.
Args:
path (str): Path to directory.
path (str | Path): Path to directory.
compresslevel (int): Level of compression, 1-9. 9 is default for
GzipFile, 6 is default for gzip.
"""
path = Path(path)
for root, _, files in os.walk(path):
for f in files:
full_f = os.path.abspath(os.path.join(root, f))
if not f.lower().endswith("gz") and not os.path.isdir(full_f):
full_f = Path(root, f).resolve()
if Path(f).suffix.lower() != ".gz" and not full_f.is_dir():
with open(full_f, "rb") as f_in, GzipFile(f"{full_f}.gz", "wb", compresslevel=compresslevel) as f_out:
shutil.copyfileobj(f_in, f_out)
shutil.copystat(full_f, f"{full_f}.gz")
os.remove(full_f)


def compress_file(filepath, compression="gz"):
def compress_file(filepath: str | Path, compression: Literal["gz", "bz2"] = "gz") -> None:
"""
Compresses a file with the correct extension. Functions like standard
Unix command line gzip and bzip2 in the sense that the original
uncompressed files are not retained.
Args:
filepath (str): Path to file.
filepath (str | Path): Path to file.
compression (str): A compression mode. Valid options are "gz" or
"bz2". Defaults to "gz".
"""
filepath = Path(filepath)
if compression not in ["gz", "bz2"]:
raise ValueError("Supported compression formats are 'gz' and 'bz2'.")
if not filepath.lower().endswith(f".{compression}"):
if filepath.suffix.lower() != f".{compression}":
with open(filepath, "rb") as f_in, zopen(f"{filepath}.{compression}", "wb") as f_out:
f_out.writelines(f_in)
os.remove(filepath)


def compress_dir(path, compression="gz"):
def compress_dir(path: str | Path, compression: Literal["gz", "bz2"] = "gz") -> None:
"""
Recursively compresses all files in a directory. Note that this
compresses all files singly, i.e., it does not create a tar archive. For
that, just use Python tarfile class.
Args:
path (str): Path to parent directory.
path (str | Path): Path to parent directory.
compression (str): A compression mode. Valid options are "gz" or
"bz2". Defaults to gz.
"""
for parent, _subdirs, files in os.walk(path):
path = Path(path)
for parent, _, files in os.walk(path):
for f in files:
compress_file(os.path.join(parent, f), compression=compression)
compress_file(Path(parent, f), compression=compression)

return None


def decompress_file(filepath) -> str | None:
def decompress_file(filepath: str | Path) -> str | None:
"""
Decompresses a file with the correct extension. Automatically detects
gz, bz2 or z extension.
Expand All @@ -104,31 +109,32 @@ def decompress_file(filepath) -> str | None:
Returns:
str: The decompressed file path.
"""
toks = filepath.split(".")
file_ext = toks[-1].upper()
if file_ext in ["BZ2", "GZ", "Z"] and os.path.isfile(filepath):
decompressed_file = ".".join(toks[0:-1])
filepath = Path(filepath)
file_ext = filepath.suffix
if file_ext.lower() in [".bz2", ".gz", ".z"] and filepath.is_file():
decompressed_file = Path(str(filepath).removesuffix(file_ext))
with zopen(filepath, "rb") as f_in, open(decompressed_file, "wb") as f_out:
f_out.writelines(f_in)
os.remove(filepath)

return decompressed_file
return str(decompressed_file)
return None


def decompress_dir(path):
def decompress_dir(path: str | Path) -> None:
"""
Recursively decompresses all files in a directory.
Args:
path (str): Path to parent directory.
path (str | Path): Path to parent directory.
"""
for parent, _subdirs, files in os.walk(path):
path = Path(path)
for parent, _, files in os.walk(path):
for f in files:
decompress_file(os.path.join(parent, f))
decompress_file(Path(parent, f))


def remove(path, follow_symlink=False):
def remove(path: str | Path, follow_symlink: bool = False) -> None:
"""
Implements a remove function that will delete files, folder trees and
symlink trees.
Expand All @@ -138,14 +144,15 @@ def remove(path, follow_symlink=False):
3.) Remove directory with rmtree
Args:
path (str): path to remove
path (str | Path): path to remove
follow_symlink(bool): follow symlinks and removes whatever is in them
"""
if os.path.isfile(path):
path = Path(path)
if path.is_file():
os.remove(path)
elif os.path.islink(path):
elif path.is_symlink():
if follow_symlink:
remove(os.readlink(path))
os.unlink(path)
Path.unlink(path)
else:
shutil.rmtree(path)

0 comments on commit b7d340f

Please sign in to comment.