Skip to content

Commit

Permalink
migrate code clean up change
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielYang59 committed Oct 22, 2024
1 parent 0b5e33e commit 7f261d8
Showing 1 changed file with 45 additions and 45 deletions.
90 changes: 45 additions & 45 deletions tests/test_shutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,57 +20,57 @@
remove,
)

TEST_DIR = os.path.join(os.path.dirname(__file__), "test_files")
test_dir = os.path.join(os.path.dirname(__file__), "test_files")


class TestCopyR:
def setup_method(self):
os.mkdir(os.path.join(TEST_DIR, "cpr_src"))
with open(os.path.join(TEST_DIR, "cpr_src", "test"), "w") as f:
os.mkdir(os.path.join(test_dir, "cpr_src"))
with open(os.path.join(test_dir, "cpr_src", "test"), "w") as f:
f.write("what")
os.mkdir(os.path.join(TEST_DIR, "cpr_src", "sub"))
with open(os.path.join(TEST_DIR, "cpr_src", "sub", "testr"), "w") as f:
os.mkdir(os.path.join(test_dir, "cpr_src", "sub"))
with open(os.path.join(test_dir, "cpr_src", "sub", "testr"), "w") as f:
f.write("what2")
if os.name != "nt":
os.symlink(
os.path.join(TEST_DIR, "cpr_src", "test"),
os.path.join(TEST_DIR, "cpr_src", "mysymlink"),
os.path.join(test_dir, "cpr_src", "test"),
os.path.join(test_dir, "cpr_src", "mysymlink"),
)

def test_recursive_copy_and_compress(self):
copy_r(os.path.join(TEST_DIR, "cpr_src"), os.path.join(TEST_DIR, "cpr_dst"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_dst", "test"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_dst", "sub", "testr"))

compress_dir(os.path.join(TEST_DIR, "cpr_src"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "test.gz"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "sub", "testr.gz"))

decompress_dir(os.path.join(TEST_DIR, "cpr_src"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "test"))
assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "sub", "testr"))
with open(os.path.join(TEST_DIR, "cpr_src", "test")) as f:
copy_r(os.path.join(test_dir, "cpr_src"), os.path.join(test_dir, "cpr_dst"))
assert os.path.exists(os.path.join(test_dir, "cpr_dst", "test"))
assert os.path.exists(os.path.join(test_dir, "cpr_dst", "sub", "testr"))

compress_dir(os.path.join(test_dir, "cpr_src"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "test.gz"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "sub", "testr.gz"))

decompress_dir(os.path.join(test_dir, "cpr_src"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "test"))
assert os.path.exists(os.path.join(test_dir, "cpr_src", "sub", "testr"))
with open(os.path.join(test_dir, "cpr_src", "test")) as f:
txt = f.read()
assert txt == "what"

def test_pathlib(self):
test_path = Path(TEST_DIR)
test_path = Path(test_dir)
copy_r(test_path / "cpr_src", test_path / "cpr_dst")
assert (test_path / "cpr_dst" / "test").exists()
assert (test_path / "cpr_dst" / "sub" / "testr").exists()

def teardown_method(self):
shutil.rmtree(os.path.join(TEST_DIR, "cpr_src"))
shutil.rmtree(os.path.join(TEST_DIR, "cpr_dst"))
shutil.rmtree(os.path.join(test_dir, "cpr_src"))
shutil.rmtree(os.path.join(test_dir, "cpr_dst"))


class TestCompressFileDir:
def setup_method(self):
with open(os.path.join(TEST_DIR, "tempfile"), "w") as f:
with open(os.path.join(test_dir, "tempfile"), "w") as f:
f.write("hello world")

def test_compress_and_decompress_file(self):
fname = os.path.join(TEST_DIR, "tempfile")
fname = os.path.join(test_dir, "tempfile")

for fmt in ["gz", "bz2"]:
compress_file(fname, fmt)
Expand All @@ -93,8 +93,8 @@ def test_compress_and_decompress_file(self):
assert decompress_file("non-existent.bz2") is None

def test_compress_and_decompress_with_target_dir(self):
fname = os.path.join(TEST_DIR, "tempfile")
target_dir = os.path.join(TEST_DIR, "temp_target_dir")
fname = os.path.join(test_dir, "tempfile")
target_dir = os.path.join(test_dir, "temp_target_dir")

for fmt in ["gz", "bz2"]:
compress_file(fname, fmt, target_dir)
Expand All @@ -117,20 +117,20 @@ def test_compress_and_decompress_with_target_dir(self):
assert f.read() == "hello world"

def teardown_method(self):
os.remove(os.path.join(TEST_DIR, "tempfile"))
os.remove(os.path.join(test_dir, "tempfile"))


class TestGzipDir:
def setup_method(self):
os.mkdir(os.path.join(TEST_DIR, "gzip_dir"))
with open(os.path.join(TEST_DIR, "gzip_dir", "tempfile"), "w") as f:
os.mkdir(os.path.join(test_dir, "gzip_dir"))
with open(os.path.join(test_dir, "gzip_dir", "tempfile"), "w") as f:
f.write("what")

self.mtime = os.path.getmtime(os.path.join(TEST_DIR, "gzip_dir", "tempfile"))
self.mtime = os.path.getmtime(os.path.join(test_dir, "gzip_dir", "tempfile"))

def test_gzip_dir(self):
full_f = os.path.join(TEST_DIR, "gzip_dir", "tempfile")
gzip_dir(os.path.join(TEST_DIR, "gzip_dir"))
full_f = os.path.join(test_dir, "gzip_dir", "tempfile")
gzip_dir(os.path.join(test_dir, "gzip_dir"))

assert os.path.exists(f"{full_f}.gz")
assert not os.path.exists(full_f)
Expand All @@ -142,7 +142,7 @@ def test_gzip_dir(self):

def test_gzip_dir_file_coexist(self):
"""Test case where both file and file.gz exist."""
full_f = os.path.join(TEST_DIR, "gzip_dir", "temptestfile")
full_f = os.path.join(test_dir, "gzip_dir", "temptestfile")
gz_f = f"{full_f}.gz"

# Create both the file and its gzipped version
Expand All @@ -154,7 +154,7 @@ def test_gzip_dir_file_coexist(self):
with pytest.warns(
UserWarning, match="Both temptestfile and temptestfile.gz exist."
):
gzip_dir(os.path.join(TEST_DIR, "gzip_dir"))
gzip_dir(os.path.join(test_dir, "gzip_dir"))

# Verify contents of the files
with open(full_f, "r") as f:
Expand All @@ -164,13 +164,13 @@ def test_gzip_dir_file_coexist(self):
assert g.read() == b"gzipped"

def test_handle_sub_dirs(self):
sub_dir = os.path.join(TEST_DIR, "gzip_dir", "sub_dir")
sub_dir = os.path.join(test_dir, "gzip_dir", "sub_dir")
sub_file = os.path.join(sub_dir, "new_tempfile")
os.mkdir(sub_dir)
with open(sub_file, "w") as f:
f.write("anotherwhat")

gzip_dir(os.path.join(TEST_DIR, "gzip_dir"))
gzip_dir(os.path.join(test_dir, "gzip_dir"))

assert os.path.exists(f"{sub_file}.gz")
assert not os.path.exists(sub_file)
Expand All @@ -179,31 +179,31 @@ def test_handle_sub_dirs(self):
assert g.readline().decode("utf-8") == "anotherwhat"

def teardown_method(self):
shutil.rmtree(os.path.join(TEST_DIR, "gzip_dir"))
shutil.rmtree(os.path.join(test_dir, "gzip_dir"))


class TestRemove:
@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_file(self):
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
tempdir = tempfile.mkdtemp(dir=test_dir)
tempf = tempfile.mkstemp(dir=tempdir)[1]
remove(tempf)
assert not os.path.isfile(tempf)
shutil.rmtree(tempdir)

@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_folder(self):
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
tempdir = tempfile.mkdtemp(dir=test_dir)
remove(tempdir)
assert not os.path.isdir(tempdir)

@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_symlink(self):
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
tempdir = tempfile.mkdtemp(dir=test_dir)
tempf = tempfile.mkstemp(dir=tempdir)[1]

os.symlink(tempdir, os.path.join(TEST_DIR, "temp_link"))
templink = os.path.join(TEST_DIR, "temp_link")
os.symlink(tempdir, os.path.join(test_dir, "temp_link"))
templink = os.path.join(test_dir, "temp_link")
remove(templink)
assert os.path.isfile(tempf)
assert os.path.isdir(tempdir)
Expand All @@ -212,11 +212,11 @@ def test_remove_symlink(self):

@unittest.skipIf(platform.system() == "Windows", "Skip on windows")
def test_remove_symlink_follow(self):
tempdir = tempfile.mkdtemp(dir=TEST_DIR)
tempdir = tempfile.mkdtemp(dir=test_dir)
tempf = tempfile.mkstemp(dir=tempdir)[1]

os.symlink(tempdir, os.path.join(TEST_DIR, "temp_link"))
templink = os.path.join(TEST_DIR, "temp_link")
os.symlink(tempdir, os.path.join(test_dir, "temp_link"))
templink = os.path.join(test_dir, "temp_link")
remove(templink, follow_symlink=True)
assert not os.path.isfile(tempf)
assert not os.path.isdir(tempdir)
Expand Down

0 comments on commit 7f261d8

Please sign in to comment.