diff --git a/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py b/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py index 2898e48352a..f3c4f92b09e 100644 --- a/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py +++ b/packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py @@ -27,80 +27,6 @@ _logger = logging.getLogger(__name__) - -async def _run_cli_command( - command: str, - *, - output_handlers: list[Callable[[str], Awaitable[None]]] | None = None, -) -> None: - """ - Raises: - ArchiveError: when it fails to execute the command - """ - - process = await asyncio.create_subprocess_shell( - command, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - - async def read_stream( - stream, chunk_size: NonNegativeInt = _DEFAULT_CHUNK_SIZE - ) -> str: - command_output = "" - - # Initialize buffer to store lookbehind window - lookbehind_buffer = "" - - undecodable_chunk: bytes | None = None - - while True: - read_chunk = await stream.read(chunk_size) - if not read_chunk: - # Process remaining buffer if any - if lookbehind_buffer and output_handlers: - await asyncio.gather( - *[handler(lookbehind_buffer) for handler in output_handlers] - ) - break - - try: - if undecodable_chunk: - chunk = (undecodable_chunk + read_chunk).decode("utf-8") - undecodable_chunk = None - else: - chunk = read_chunk.decode("utf-8") - except UnicodeDecodeError: - undecodable_chunk = read_chunk - continue - - command_output += chunk - - # Combine lookbehind buffer with new chunk - chunk_to_emit = lookbehind_buffer + chunk - - if output_handlers: - await asyncio.gather( - *[handler(chunk_to_emit) for handler in output_handlers] - ) - - # Keep last window_size characters for next iteration - lookbehind_buffer = chunk_to_emit[-chunk_size:] - - return command_output - - # Wait for the process to complete and all output to be processed - command_output, _ = await asyncio.gather( - asyncio.create_task(read_stream(process.stdout)), - process.wait(), - ) - - if process.returncode != os.EX_OK: - msg = f"Could not run '{command}' error: '{command_output}'" - raise ArchiveError(msg) - - _TOTAL_BYTES_RE: Final[str] = r" (\d+)\s*bytes " _FILE_COUNT_RE: Final[str] = r" (\d+)\s*files" _PROGRESS_PERCENT_RE: Final[str] = r" (?:100|\d?\d)% " @@ -183,6 +109,85 @@ async def parse_chunk(self, chunk: str) -> None: self.finished_emitted = True +async def _output_reader( + stream, + *, + output_handlers: list[Callable[[str], Awaitable[None]]] | None, + chunk_size: NonNegativeInt = _DEFAULT_CHUNK_SIZE, +) -> str: + command_output = "" + + # Initialize buffer to store lookbehind window + lookbehind_buffer = "" + + undecodable_chunk: bytes | None = None + + while True: + read_chunk = await stream.read(chunk_size) + if not read_chunk: + # Process remaining buffer if any + if lookbehind_buffer and output_handlers: + await asyncio.gather( + *[handler(lookbehind_buffer) for handler in output_handlers] + ) + break + + try: + if undecodable_chunk: + chunk = (undecodable_chunk + read_chunk).decode("utf-8") + undecodable_chunk = None + else: + chunk = read_chunk.decode("utf-8") + except UnicodeDecodeError: + undecodable_chunk = read_chunk + continue + + command_output += chunk + + # Combine lookbehind buffer with new chunk + chunk_to_emit = lookbehind_buffer + chunk + + if output_handlers: + await asyncio.gather( + *[handler(chunk_to_emit) for handler in output_handlers] + ) + + # Keep last window_size characters for next iteration + lookbehind_buffer = chunk_to_emit[-chunk_size:] + + return command_output + + +async def _run_cli_command( + command: str, + *, + output_handlers: list[Callable[[str], Awaitable[None]]] | None = None, +) -> None: + """ + Raises: + ArchiveError: when it fails to execute the command + """ + + process = await asyncio.create_subprocess_shell( + command, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + # Wait for the process to complete and all output to be processed + command_output, _ = await asyncio.gather( + asyncio.create_task( + _output_reader(process.stdout, output_handlers=output_handlers) + ), + process.wait(), + ) + + if process.returncode != os.EX_OK: + msg = f"Could not run '{command}' error: '{command_output}'" + raise ArchiveError(msg) + + async def archive_dir( dir_to_compress: Path, destination: Path,