Skip to content

Commit

Permalink
Improve inference of archive name
Browse files Browse the repository at this point in the history
- add list of allowed file name extensions
- allow to provide archive name explicitly on the command line
  • Loading branch information
akahles committed Nov 18, 2024
1 parent 7efcc0e commit 48ce9be
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
2 changes: 2 additions & 0 deletions archiver/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@
ENCRYPTION_ALGORITHM = "AES256"
ENV_VAR_MAPPER_MAX_CPUS = "ARCHIVER_MAX_CPUS_ENV_VAR"
DEFAULT_COMPRESSION_LEVEL = 6
ALLOWED_SUFFIXES = ['.part[0-9]+', '.tar', '.md5', '.lz', '.gpg', '.lst']
ALLOWED_SUFFIXES_REG = '(' + ')|('.join(ALLOWED_SUFFIXES) + ')'

MD5_LINE_REGEX = re.compile(r'(\S+)\s+(\S.*)')
31 changes: 26 additions & 5 deletions archiver/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import unicodedata

from .constants import READ_CHUNK_BYTE_SIZE, COMPRESSED_ARCHIVE_SUFFIX, \
ENCRYPTED_ARCHIVE_SUFFIX, ENV_VAR_MAPPER_MAX_CPUS, MD5_LINE_REGEX
ENCRYPTED_ARCHIVE_SUFFIX, ENV_VAR_MAPPER_MAX_CPUS, MD5_LINE_REGEX, \
ALLOWED_SUFFIXES_REG


def get_files_with_type_in_directory_or_terminate(directory, file_type):
Expand Down Expand Up @@ -339,15 +340,35 @@ def file_is_valid_archive_or_terminate(file_path):


def filename_without_extensions(path):
"""Removes every suffix, including .partX"""
suffixes_string = "".join(path.suffixes)
"""Removes every allowed suffix, including .partX"""
suffixes = path.suffixes
if len(suffixes) > 0:
allowed_suffixes = []
for s in suffixes[::-1]:
if re.match(ALLOWED_SUFFIXES_REG, s.lower()):
allowed_suffixes.append(s)
else:
break
suffixes = allowed_suffixes[::-1]

suffixes_string = "".join(suffixes)

return path.name[:-len(suffixes_string)]


def filepath_without_extensions(path:Path) -> Path:
"""Removes every suffix, including .partX"""
suffixes_string = "".join(path.suffixes)
suffixes = path.suffixes
if len(suffixes) > 0:
allowed_suffixes = []
for s in suffixes[::-1]:
if re.match(ALLOWED_SUFFIXES_REG, s.lower()):
allowed_suffixes.append(s)
else:
break
suffixes = allowed_suffixes[::-1]

suffixes_string = "".join(suffixes)

return path.parent / path.name[:-len(suffixes_string)]

Expand All @@ -362,7 +383,7 @@ def infer_source_name(source_path: Path) -> Path:
if len(unique_names) == 0:
terminate_with_message('There are no archive files present')
elif len(unique_names) > 1:
terminate_with_message(f'More than one possible archive name detected: {str(unique_names)}')
terminate_with_message(f'Automatic archive name detection has failed. More than one possible archive name detected: {str(unique_names)}\n optionally use --archive_name to specific archive name.')

return unique_names[0]

Expand Down
13 changes: 8 additions & 5 deletions archiver/integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .listing import parse_tar_listing


def check_integrity(source_path, deep_flag=False, threads=None, work_dir=None):
def check_integrity(source_path, deep_flag=False, threads=None, work_dir=None, archive_name=None):

archives_with_hashes = get_archives_with_hashes_from_path(source_path)
is_encrypted = helpers.path_target_is_encrypted(source_path)
Expand All @@ -20,10 +20,10 @@ def check_integrity(source_path, deep_flag=False, threads=None, work_dir=None):
check_result = shallow_integrity_check(archives_with_hashes, workers=threads)

if source_path.is_dir():
integrity_result = check_archive_list_integrity(source_path)
integrity_result = check_archive_list_integrity(source_path, archive_name)
else:
file_path = source_path.parent / Path(helpers.filename_without_archive_extensions(source_path))
integrity_result = check_archive_part_integrity(file_path)
integrity_result = check_archive_part_integrity(file_path, archive_name)

if not integrity_result:
logging.error(
Expand Down Expand Up @@ -74,10 +74,13 @@ def check_archive_part_integrity(source_name: Path) -> bool:

return check_result

def check_archive_list_integrity(source_path: Path) -> bool:
def check_archive_list_integrity(source_path: Path, archive_name: str = None) -> bool:

parts = helpers.get_parts(source_path)
source_name = helpers.infer_source_name(source_path)
if archive_name is None:
source_name = helpers.infer_source_name(source_path)
else:
source_name = source_path / Path(archive_name)

logging.info(f'Found {parts} parts in archive {source_path.as_posix()}')
check_result = True
Expand Down
3 changes: 2 additions & 1 deletion archiver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def parse_arguments(args):
parser_check.add_argument("archive_dir", type=str, help="Select source archive directory or .tar.lz file")
parser_check.add_argument("-d", "--deep", action="store_true", help="Verify integrity by unpacking archive and hashing each file")
parser_check.add_argument("-n", "--threads", type=int, help=thread_help)
parser_check.add_argument("--archive_name", type=str, help="Provide explicit source name of the archive (if automatic detection fails")
parser_check.set_defaults(func=handle_check)

# Preparation checks
Expand Down Expand Up @@ -285,7 +286,7 @@ def handle_check(args):
source_path = Path(args.archive_dir)
threads = helpers.get_threads_from_args_or_environment(args.threads)

if not check_integrity(source_path, args.deep, threads, args.work_dir):
if not check_integrity(source_path, args.deep, threads, args.work_dir, args.archive_name):
# return a different error code to the default code of 1 to be able to distinguish
# general errors from a successful run of the program with an unsuccessful outcome
# not taking 2, as it usually stands for command line argument errors
Expand Down

0 comments on commit 48ce9be

Please sign in to comment.