diff --git a/src/aibs_informatics_aws_utils/data_sync/operations.py b/src/aibs_informatics_aws_utils/data_sync/operations.py index 235cfcd..7ae3aea 100644 --- a/src/aibs_informatics_aws_utils/data_sync/operations.py +++ b/src/aibs_informatics_aws_utils/data_sync/operations.py @@ -1,5 +1,6 @@ import functools import os +import tempfile from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @@ -7,12 +8,13 @@ from aibs_informatics_core.models.aws.efs import EFSPath from aibs_informatics_core.models.aws.s3 import S3URI, S3KeyPrefix -from aibs_informatics_core.models.data_sync import DataSyncConfig, DataSyncRequest, DataSyncTask +from aibs_informatics_core.models.data_sync import DataSyncConfig, DataSyncRequest, DataSyncTask, RemoteToLocalConfig from aibs_informatics_core.utils.decorators import retry from aibs_informatics_core.utils.file_operations import ( CannotAcquirePathLockError, PathLock, copy_path, + find_filesystem_boundary, move_path, remove_path, ) @@ -80,8 +82,10 @@ def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI): def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath): self.logger.info(f"Downloading s3 content from {source_path} -> {destination_path}") start_time = datetime.now(tz=timezone.utc) + destination_path = self.sanitize_local_path(destination_path) + source_is_object = is_object(source_path) - if not is_object(source_path) and not is_folder(source_path): + if not source_is_object and not is_folder(source_path): message = f"S3 path {source_path} does not exist as object or folder" if self.config.fail_if_missing: raise FileNotFoundError(message) @@ -107,17 +111,48 @@ def sync_paths_with_lock(*args, **kwargs): _sync_paths = sync_paths_with_lock - destination_path = self.sanitize_local_path(destination_path) - - _sync_paths( - source_path=source_path, - destination_path=destination_path, - transfer_config=self.s3_transfer_config, - config=self.botocore_config, - force=self.config.force, - size_only=self.config.size_only, - delete=True, - ) + remote_to_local_config = self.config.remote_to_local_config + if source_is_object and remote_to_local_config.use_custom_tmp_dir: + # If our source is an s3 object (not prefix) and we want to use custom object + # download logic (default True), then we save s3 objects to a temporary location + # that is on the SAME file system. + # + # This is necessary because if the normal boto3 download gets interrupted in a + # catastrophic way that prevents built-in cleanup strategies, it leaves + # a 'partial' file (e.g. `*.6eF5b5da`) that resides in the SAME parent directory + # as the actual intended destination path. This 'partial' file can be picked up by + # some scientific executables (e.g. cellranger) and interpreted as an invalid input + if remote_to_local_config.custom_tmp_dir is None: + custom_tmp_dir = find_filesystem_boundary(destination_path) + elif isinstance(remote_to_local_config.custom_tmp_dir, EFSPath): + custom_tmp_dir = self.sanitize_local_path(remote_to_local_config.custom_tmp_dir) + else: + custom_tmp_dir = remote_to_local_config.custom_tmp_dir + + with tempfile.TemporaryDirectory(dir=custom_tmp_dir) as tmp_dir: + tmp_destination_path = Path(tmp_dir) / destination_path.name + _sync_paths( + source_path=source_path, + destination_path=tmp_destination_path, + transfer_config=self.s3_transfer_config, + config=self.botocore_config, + force=self.config.force, + size_only=self.config.size_only, + ) + destination_path.parent.mkdir(parents=True, exist_ok=True) + os.rename(src=tmp_destination_path, dst=destination_path) + else: + # If our source is a prefix, then _sync_paths has builtin logic to deal with deleting + # excess files in the destination dir that do not match the source prefix layout. + _sync_paths( + source_path=source_path, + destination_path=destination_path, + transfer_config=self.s3_transfer_config, + config=self.botocore_config, + force=self.config.force, + size_only=self.config.size_only, + delete=True, + ) self.logger.info(f"Updating last modified time on local files to at least {start_time}") refresh_local_path__mtime(destination_path, start_time.timestamp()) @@ -221,8 +256,8 @@ def sanitize_local_path(self, path: Union[EFSPath, Path]) -> Path: @classmethod def sync_request(cls, request: DataSyncRequest): - sync_operations = cls(request) - sync_operations.sync_task(task=request) + sync_operations = cls(config=request.config) + sync_operations.sync_task(task=request.task) # We should consider using cloudpathlib[s3] in the future @@ -236,6 +271,7 @@ def sync_data( force: bool = False, size_only: bool = False, fail_if_missing: bool = True, + remote_to_local_config: Optional[RemoteToLocalConfig] = None, ): request = DataSyncRequest( source_path=source_path, @@ -247,6 +283,7 @@ def sync_data( force=force, size_only=size_only, fail_if_missing=fail_if_missing, + remote_to_local_config=remote_to_local_config or RemoteToLocalConfig() ) return DataSyncOperations.sync_request(request=request) diff --git a/test/aibs_informatics_aws_utils/data_sync/test_operations.py b/test/aibs_informatics_aws_utils/data_sync/test_operations.py index 348eac2..4bb8d25 100644 --- a/test/aibs_informatics_aws_utils/data_sync/test_operations.py +++ b/test/aibs_informatics_aws_utils/data_sync/test_operations.py @@ -6,6 +6,7 @@ from aibs_informatics_core.models.aws.s3 import S3URI from aibs_informatics_core.utils.os_operations import find_all_paths +from aibs_informatics_core.models.data_sync import RemoteToLocalConfig from aibs_informatics_aws_utils.data_sync.operations import sync_data from aibs_informatics_aws_utils.s3 import get_s3_client, get_s3_resource, is_object, list_s3_paths @@ -268,6 +269,35 @@ def test__sync_data__s3_to_local__file__does_not_exist(self): ) assert not destination_path.exists() + def test__sync_data__s3_to_local__file__auto_custom_tmp_dir__succeeds(self): + fs = self.setUpLocalFS() + self.setUpBucket() + source_path = self.put_object("source", "hello") + destination_path = fs / "destination" + sync_data( + source_path=source_path, + destination_path=destination_path, + remote_to_local_config=RemoteToLocalConfig( + use_custom_tmp_dir=True + ) + ) + self.assertPathsEqual(source_path, destination_path, 1) + + def test__sync_data__s3_to_local__file__specified_custom_tmp_dir__succeeds(self): + fs = self.setUpLocalFS() + self.setUpBucket() + source_path = self.put_object("source", "hello") + destination_path = fs / "destination" + sync_data( + source_path=source_path, + destination_path=destination_path, + remote_to_local_config=RemoteToLocalConfig( + use_custom_tmp_dir=True, + custom_tmp_dir=fs, + ) + ) + self.assertPathsEqual(source_path, destination_path, 1) + def test__sync_data__local_to_s3__folder__succeeds(self): fs = self.setUpLocalFS() self.setUpBucket()