From 8e298a3a937e6dff0e075b1facf2d0045c26ab1d Mon Sep 17 00:00:00 2001 From: Ryan McGinty Date: Wed, 6 Nov 2024 18:54:28 -0800 Subject: [PATCH 1/4] cache botocore config --- src/aibs_informatics_aws_utils/data_sync/operations.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/aibs_informatics_aws_utils/data_sync/operations.py b/src/aibs_informatics_aws_utils/data_sync/operations.py index 9f5b769..e3956ba 100644 --- a/src/aibs_informatics_aws_utils/data_sync/operations.py +++ b/src/aibs_informatics_aws_utils/data_sync/operations.py @@ -46,6 +46,11 @@ LocalPath = Union[Path, EFSPath] +@functools.cache +def get_botocore_config(max_pool_connections: int) -> Config: + return Config(max_pool_connections=max_pool_connections) + + @dataclass class DataSyncOperations(LoggingMixin): config: DataSyncConfig @@ -56,7 +61,7 @@ def s3_transfer_config(self) -> TransferConfig: @property def botocore_config(self) -> Config: - return Config(max_pool_connections=self.config.max_concurrency) + return get_botocore_config(max_pool_connections=self.config.max_concurrency) def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI): source_path = self.sanitize_local_path(source_path) From 74a9b68edbf52df77aa9c9799b23962994459529 Mon Sep 17 00:00:00 2001 From: Ryan McGinty Date: Fri, 8 Nov 2024 14:34:12 -0800 Subject: [PATCH 2/4] updating result returned from data sync ops --- .../data_sync/operations.py | 89 ++++++++++++++----- .../data_sync/test_operations.py | 27 ++++-- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/src/aibs_informatics_aws_utils/data_sync/operations.py b/src/aibs_informatics_aws_utils/data_sync/operations.py index e3956ba..cde9e3b 100644 --- a/src/aibs_informatics_aws_utils/data_sync/operations.py +++ b/src/aibs_informatics_aws_utils/data_sync/operations.py @@ -8,12 +8,16 @@ from aibs_informatics_core.models.aws.efs import EFSPath from aibs_informatics_core.models.aws.s3 import S3URI, S3KeyPrefix +<<<<<<< HEAD from aibs_informatics_core.models.data_sync import ( DataSyncConfig, DataSyncRequest, DataSyncTask, RemoteToLocalConfig, ) +======= +from aibs_informatics_core.models.data_sync import DataSyncConfig, DataSyncRequest, DataSyncTask, DataSyncResult +>>>>>>> d255aa1 (updating result returned from data sync ops) from aibs_informatics_core.utils.decorators import retry from aibs_informatics_core.utils.file_operations import ( CannotAcquirePathLockError, @@ -22,6 +26,7 @@ find_filesystem_boundary, move_path, remove_path, + get_path_size_bytes, ) from aibs_informatics_core.utils.logging import LoggingMixin, get_logger from aibs_informatics_core.utils.os_operations import find_all_paths @@ -34,6 +39,7 @@ is_folder, is_object, sync_paths, + get_s3_path_stats, ) logger = get_logger(__name__) @@ -63,13 +69,16 @@ def s3_transfer_config(self) -> TransferConfig: def botocore_config(self) -> Config: return get_botocore_config(max_pool_connections=self.config.max_concurrency) - def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI): + def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI) -> DataSyncResult: source_path = self.sanitize_local_path(source_path) if not source_path.exists(): if self.config.fail_if_missing: raise FileNotFoundError(f"Local path {source_path} does not exist") self.logger.warning(f"Local path {source_path} does not exist") - return + if self.config.include_detailed_response: + return DataSyncResult(bytes_transferred=0, files_transferred=0) + else: + return DataSyncResult() if source_path.is_dir(): self.logger.info("local source path is folder. Adding suffix to destination path") destination_path = S3URI.build( @@ -86,10 +95,15 @@ def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI): size_only=self.config.size_only, delete=True, ) + result = DataSyncResult() + if self.config.include_detailed_response: + result.files_transferred = len(find_all_paths(source_path, include_dirs=False)) + result.bytes_transferred = get_path_size_bytes(source_path) if not self.config.retain_source_data: remove_path(source_path) - - def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath): + return result + + def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath) -> DataSyncResult: self.logger.info(f"Downloading s3 content from {source_path} -> {destination_path}") start_time = datetime.now(tz=timezone.utc) destination_path = self.sanitize_local_path(destination_path) @@ -100,7 +114,10 @@ def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath): if self.config.fail_if_missing: raise FileNotFoundError(message) self.logger.warning(message) - return + if self.config.include_detailed_response: + return DataSyncResult(bytes_transferred=0, files_transferred=0) + else: + return DataSyncResult() _sync_paths = sync_paths @@ -173,7 +190,14 @@ def sync_paths_with_lock(*args, **kwargs): "Deleting s3 objects not allowed when downloading them to local file system" ) - def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPath): + result = DataSyncResult() + # Collecting stats for detailed response + if self.config.include_detailed_response: + result.files_transferred = len(find_all_paths(destination_path, include_dirs=False)) + result.bytes_transferred = get_path_size_bytes(destination_path) + return result + + def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPath) -> DataSyncResult: source_path = self.sanitize_local_path(source_path) destination_path = self.sanitize_local_path(destination_path) self.logger.info(f"Copying local content from {source_path} -> {destination_path}") @@ -183,7 +207,7 @@ def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPat if self.config.fail_if_missing: raise FileNotFoundError(f"Local path {source_path} does not exist") self.logger.warning(f"Local path {source_path} does not exist") - return + return DataSyncResult(bytes_transferred=0) if self.config.retain_source_data: copy_path(source_path=source_path, destination_path=destination_path, exists_ok=True) @@ -191,13 +215,20 @@ def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPat move_path(source_path=source_path, destination_path=destination_path, exists_ok=True) self.logger.info(f"Updating last modified time on local files to at least {start_time}") refresh_local_path__mtime(destination_path, start_time.timestamp()) + + result = DataSyncResult() + # Collecting stats for detailed response + if self.config.include_detailed_response: + result.files_transferred = len(find_all_paths(source_path, include_dirs=False)) + result.bytes_transferred = get_path_size_bytes(source_path) + return result def sync_s3_to_s3( self, source_path: S3URI, destination_path: S3URI, source_path_prefix: Optional[S3KeyPrefix] = None, - ): + ) -> DataSyncResult: self.logger.info(f"Syncing s3 content from {source_path} -> {destination_path}") if not is_object(source_path) and not is_folder(source_path): @@ -205,8 +236,11 @@ def sync_s3_to_s3( if self.config.fail_if_missing: raise FileNotFoundError(message) self.logger.warning(message) - return - + if self.config.include_detailed_response: + return DataSyncResult(bytes_transferred=0, files_transferred=0) + else: + return DataSyncResult() + sync_paths( source_path=source_path, destination_path=destination_path, @@ -219,43 +253,59 @@ def sync_s3_to_s3( ) if not self.config.retain_source_data: delete_s3_path(s3_path=source_path) + + result = DataSyncResult() + if self.config.include_detailed_response: + path_stats = get_s3_path_stats(destination_path) + result.files_transferred = path_stats.object_count + result.bytes_transferred = path_stats.size_bytes + return result def sync( self, source_path: Union[LocalPath, S3URI], destination_path: Union[LocalPath, S3URI], source_path_prefix: Optional[str] = None, - ): + ) -> DataSyncResult: if isinstance(source_path, S3URI) and isinstance(destination_path, S3URI): - self.sync_s3_to_s3( + return self.sync_s3_to_s3( source_path=source_path, destination_path=destination_path, source_path_prefix=S3KeyPrefix(source_path_prefix) if source_path_prefix else None, ) elif isinstance(source_path, S3URI): - self.sync_s3_to_local( + return self.sync_s3_to_local( source_path=source_path, destination_path=cast(LocalPath, destination_path), ) elif isinstance(destination_path, S3URI): - self.sync_local_to_s3( + return self.sync_local_to_s3( source_path=cast(LocalPath, source_path), destination_path=destination_path, ) else: - self.sync_local_to_local( + return self.sync_local_to_local( source_path=source_path, destination_path=destination_path, ) - def sync_task(self, task: DataSyncTask): + def sync_task(self, task: DataSyncTask) -> DataSyncResult: return self.sync( source_path=task.source_path, destination_path=task.destination_path, source_path_prefix=task.source_path_prefix, ) + @classmethod + def sync_request(cls, request: DataSyncRequest) -> DataSyncResult: + sync_operations = cls(config=request.config) + return sync_operations.sync_task(task=request.task) + + # ----------------------------------- + # Helper methods + # ----------------------------------- + def sanitize_local_path(self, path: Union[EFSPath, Path]) -> Path: if isinstance(path, EFSPath): self.logger.info(f"Sanitizing efs path {path}") @@ -264,11 +314,6 @@ def sanitize_local_path(self, path: Union[EFSPath, Path]) -> Path: return new_path return path - @classmethod - def sync_request(cls, request: DataSyncRequest): - sync_operations = cls(config=request.config) - sync_operations.sync_task(task=request.task) - # We should consider using cloudpathlib[s3] in the future def sync_data( @@ -282,6 +327,7 @@ def sync_data( size_only: bool = False, fail_if_missing: bool = True, remote_to_local_config: Optional[RemoteToLocalConfig] = None, + include_detailed_response: bool = False, ): request = DataSyncRequest( source_path=source_path, @@ -294,6 +340,7 @@ def sync_data( size_only=size_only, fail_if_missing=fail_if_missing, remote_to_local_config=remote_to_local_config or RemoteToLocalConfig(), + include_detailed_response=include_detailed_response, ) return DataSyncOperations.sync_request(request=request) diff --git a/test/aibs_informatics_aws_utils/data_sync/test_operations.py b/test/aibs_informatics_aws_utils/data_sync/test_operations.py index 7e45649..7f35a23 100644 --- a/test/aibs_informatics_aws_utils/data_sync/test_operations.py +++ b/test/aibs_informatics_aws_utils/data_sync/test_operations.py @@ -138,11 +138,14 @@ def test__sync_data__local_to_local__file__succeeds(self): destination_path = fs / "destination" self.put_file(source_path, "hello") - sync_data( + result = sync_data( source_path=source_path, destination_path=destination_path, + include_detailed_response=True, ) self.assertPathsEqual(source_path, destination_path, 1) + self.assertEqual(result.files_transferred, 1) + self.assertEqual(result.bytes_transferred, 5) def test__sync_data__local_to_local__relative_file__succeeds(self): fs = self.setUpLocalFS() @@ -150,11 +153,14 @@ def test__sync_data__local_to_local__relative_file__succeeds(self): destination_path = fs / "destination" self.put_file(source_path, "hello") with self.chdir(fs): - sync_data( + result = sync_data( source_path=Path("source"), destination_path=Path("destination"), + include_detailed_response=True, ) self.assertPathsEqual(source_path, destination_path, 1) + self.assertEqual(result.files_transferred, 1) + self.assertEqual(result.bytes_transferred, 5) def test__sync_data__local_to_local__file__source_deleted(self): fs = self.setUpLocalFS() @@ -162,7 +168,7 @@ def test__sync_data__local_to_local__file__source_deleted(self): destination_path = fs / "destination" self.put_file(source_path, "hello") - sync_data( + result = sync_data( source_path=source_path, destination_path=destination_path, retain_source_data=False, @@ -192,11 +198,14 @@ def test__sync_data__s3_to_local__folder__succeeds(self): self.put_object("source/path/dir1/obj2", "did you hear me") destination_path = fs / "destination2" - sync_data( + result = sync_data( source_path=source_path, destination_path=destination_path, + include_detailed_response=True, ) self.assertPathsEqual(source_path, destination_path, 2) + self.assertEqual(result.files_transferred, 2) + self.assertEqual(result.bytes_transferred, 20) def test__sync_data__s3_to_local__folder__cached_results_mtime_updated(self): fs = self.setUpLocalFS() @@ -206,11 +215,14 @@ def test__sync_data__s3_to_local__folder__cached_results_mtime_updated(self): self.put_object("source/path/dir1/obj2", "did you hear me") destination_path = fs / "destination" - sync_data( + result = sync_data( source_path=source_path, destination_path=destination_path, + include_detailed_response=True, ) self.assertPathsEqual(source_path, destination_path, 2) + self.assertEqual(result.files_transferred, 2) + self.assertEqual(result.bytes_transferred, 20) sync_data( source_path=source_path, @@ -223,11 +235,14 @@ def test__sync_data__s3_to_local__file__succeeds(self): self.setUpBucket() source_path = self.put_object("source", "hello") destination_path = fs / "destination" - sync_data( + result = sync_data( source_path=source_path, destination_path=destination_path, + include_detailed_response=True, ) self.assertPathsEqual(source_path, destination_path, 1) + self.assertEqual(result.files_transferred, 1) + self.assertEqual(result.bytes_transferred, 5) def test__sync_data__s3_to_local__file__lock_required__succeeds(self): fs = self.setUpLocalFS() From d982056b4833dc0cbeb02bf1a4f830c6c9e1adea Mon Sep 17 00:00:00 2001 From: Ryan McGinty Date: Fri, 8 Nov 2024 14:35:35 -0800 Subject: [PATCH 3/4] formatting and adding more tests --- .../data_sync/operations.py | 27 ++++--- .../data_sync/test_operations.py | 76 +++++++++++++++++++ 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/src/aibs_informatics_aws_utils/data_sync/operations.py b/src/aibs_informatics_aws_utils/data_sync/operations.py index cde9e3b..122a7a8 100644 --- a/src/aibs_informatics_aws_utils/data_sync/operations.py +++ b/src/aibs_informatics_aws_utils/data_sync/operations.py @@ -8,25 +8,22 @@ from aibs_informatics_core.models.aws.efs import EFSPath from aibs_informatics_core.models.aws.s3 import S3URI, S3KeyPrefix -<<<<<<< HEAD from aibs_informatics_core.models.data_sync import ( DataSyncConfig, DataSyncRequest, + DataSyncResult, DataSyncTask, RemoteToLocalConfig, ) -======= -from aibs_informatics_core.models.data_sync import DataSyncConfig, DataSyncRequest, DataSyncTask, DataSyncResult ->>>>>>> d255aa1 (updating result returned from data sync ops) from aibs_informatics_core.utils.decorators import retry from aibs_informatics_core.utils.file_operations import ( CannotAcquirePathLockError, PathLock, copy_path, find_filesystem_boundary, + get_path_size_bytes, move_path, remove_path, - get_path_size_bytes, ) from aibs_informatics_core.utils.logging import LoggingMixin, get_logger from aibs_informatics_core.utils.os_operations import find_all_paths @@ -36,10 +33,10 @@ Config, TransferConfig, delete_s3_path, + get_s3_path_stats, is_folder, is_object, sync_paths, - get_s3_path_stats, ) logger = get_logger(__name__) @@ -102,7 +99,7 @@ def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI) -> D if not self.config.retain_source_data: remove_path(source_path) return result - + def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath) -> DataSyncResult: self.logger.info(f"Downloading s3 content from {source_path} -> {destination_path}") start_time = datetime.now(tz=timezone.utc) @@ -191,13 +188,15 @@ def sync_paths_with_lock(*args, **kwargs): ) result = DataSyncResult() - # Collecting stats for detailed response + # Collecting stats for detailed response if self.config.include_detailed_response: result.files_transferred = len(find_all_paths(destination_path, include_dirs=False)) result.bytes_transferred = get_path_size_bytes(destination_path) return result - def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPath) -> DataSyncResult: + def sync_local_to_local( + self, source_path: LocalPath, destination_path: LocalPath + ) -> DataSyncResult: source_path = self.sanitize_local_path(source_path) destination_path = self.sanitize_local_path(destination_path) self.logger.info(f"Copying local content from {source_path} -> {destination_path}") @@ -215,9 +214,9 @@ def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPat move_path(source_path=source_path, destination_path=destination_path, exists_ok=True) self.logger.info(f"Updating last modified time on local files to at least {start_time}") refresh_local_path__mtime(destination_path, start_time.timestamp()) - + result = DataSyncResult() - # Collecting stats for detailed response + # Collecting stats for detailed response if self.config.include_detailed_response: result.files_transferred = len(find_all_paths(source_path, include_dirs=False)) result.bytes_transferred = get_path_size_bytes(source_path) @@ -240,7 +239,7 @@ def sync_s3_to_s3( return DataSyncResult(bytes_transferred=0, files_transferred=0) else: return DataSyncResult() - + sync_paths( source_path=source_path, destination_path=destination_path, @@ -253,7 +252,7 @@ def sync_s3_to_s3( ) if not self.config.retain_source_data: delete_s3_path(s3_path=source_path) - + result = DataSyncResult() if self.config.include_detailed_response: path_stats = get_s3_path_stats(destination_path) @@ -305,7 +304,7 @@ def sync_request(cls, request: DataSyncRequest) -> DataSyncResult: # ----------------------------------- # Helper methods # ----------------------------------- - + def sanitize_local_path(self, path: Union[EFSPath, Path]) -> Path: if isinstance(path, EFSPath): self.logger.info(f"Sanitizing efs path {path}") diff --git a/test/aibs_informatics_aws_utils/data_sync/test_operations.py b/test/aibs_informatics_aws_utils/data_sync/test_operations.py index 7f35a23..ce46036 100644 --- a/test/aibs_informatics_aws_utils/data_sync/test_operations.py +++ b/test/aibs_informatics_aws_utils/data_sync/test_operations.py @@ -72,6 +72,21 @@ def client__list_objects_v2(self, **kwargs): return self.s3_client.list_objects_v2(**kwargs) def test__sync_data__s3_to_s3__folder__succeeds(self): + self.setUpBucket() + source_path = self.get_s3_path("source/path/") + destination_path = self.get_s3_path("destination/path/") + path1 = self.put_object("source/path/obj1", "hello") + path2 = self.put_object("source/path/dir1/obj2", "did you hear me") + result = sync_data( + source_path=source_path, + destination_path=destination_path, + include_detailed_response=True, + ) + self.assertPathsEqual(source_path, destination_path, 2) + self.assertEqual(result.files_transferred, 2) + self.assertEqual(result.bytes_transferred, 20) + + def test__sync_data__s3_to_s3__folder__succeeds__no_detailed_response(self): self.setUpBucket() source_path = self.get_s3_path("source/path/") destination_path = self.get_s3_path("destination/path/") @@ -80,20 +95,50 @@ def test__sync_data__s3_to_s3__folder__succeeds(self): sync_data( source_path=source_path, destination_path=destination_path, + include_detailed_response=False, ) self.assertPathsEqual(source_path, destination_path, 2) def test__sync_data__s3_to_s3__file__succeeds(self): + self.setUpBucket() + source_path = self.put_object("source/path/obj1", "hello") + destination_path = self.get_s3_path("destination/path/") + result = sync_data( + source_path=source_path, + destination_path=destination_path, + include_detailed_response=True, + ) + self.assertPathsEqual(source_path, destination_path, 1) + self.assertEqual(result.files_transferred, 1) + self.assertEqual(result.bytes_transferred, 5) + + def test__sync_data__s3_to_s3__file__succeeds__no_detailed_response(self): self.setUpBucket() source_path = self.put_object("source/path/obj1", "hello") destination_path = self.get_s3_path("destination/path/") sync_data( source_path=source_path, destination_path=destination_path, + include_detailed_response=False, ) self.assertPathsEqual(source_path, destination_path, 1) def test__sync_data__s3_to_s3__file__succeeds__source_deleted(self): + self.setUpBucket() + source_path = self.put_object("source/path/obj1", "hello") + destination_path = self.get_s3_path("destination/path/") + result = sync_data( + source_path=source_path, + destination_path=destination_path, + retain_source_data=False, + include_detailed_response=True, + ) + assert self.get_object(destination_path.key) == "hello" + assert not is_object(source_path) + self.assertEqual(result.files_transferred, 1) + self.assertEqual(result.bytes_transferred, 5) + + def test__sync_data__s3_to_s3__file__succeeds__source_deleted__no_detailed_response(self): self.setUpBucket() source_path = self.put_object("source/path/obj1", "hello") destination_path = self.get_s3_path("destination/path/") @@ -101,6 +146,7 @@ def test__sync_data__s3_to_s3__file__succeeds__source_deleted(self): source_path=source_path, destination_path=destination_path, retain_source_data=False, + include_detailed_response=False, ) assert self.get_object(destination_path.key) == "hello" assert not is_object(source_path) @@ -126,9 +172,26 @@ def test__sync_data__local_to_local__folder__succeeds(self): self.put_file(source_path / "file1", "hello") self.put_file(source_path / "file2", "did you hear me") + result = sync_data( + source_path=source_path, + destination_path=destination_path, + include_detailed_response=True, + ) + self.assertPathsEqual(source_path, destination_path, 2) + self.assertEqual(result.files_transferred, 2) + self.assertEqual(result.bytes_transferred, 20) + + def test__sync_data__local_to_local__folder__succeeds__no_detailed_response(self): + fs = self.setUpLocalFS() + source_path = fs / "source" + destination_path = fs / "destination" + self.put_file(source_path / "file1", "hello") + self.put_file(source_path / "file2", "did you hear me") + sync_data( source_path=source_path, destination_path=destination_path, + include_detailed_response=False, ) self.assertPathsEqual(source_path, destination_path, 2) @@ -147,6 +210,19 @@ def test__sync_data__local_to_local__file__succeeds(self): self.assertEqual(result.files_transferred, 1) self.assertEqual(result.bytes_transferred, 5) + def test__sync_data__local_to_local__file__succeeds__no_detailed_response(self): + fs = self.setUpLocalFS() + source_path = fs / "source" + destination_path = fs / "destination" + self.put_file(source_path, "hello") + + sync_data( + source_path=source_path, + destination_path=destination_path, + include_detailed_response=False, + ) + self.assertPathsEqual(source_path, destination_path, 1) + def test__sync_data__local_to_local__relative_file__succeeds(self): fs = self.setUpLocalFS() source_path = fs / "source" From b40d6a8ff4740d46e768d7bf0b933110babf12ed Mon Sep 17 00:00:00 2001 From: Ryan McGinty Date: Mon, 11 Nov 2024 17:13:12 -0800 Subject: [PATCH 4/4] pr updates --- src/aibs_informatics_aws_utils/data_sync/operations.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/aibs_informatics_aws_utils/data_sync/operations.py b/src/aibs_informatics_aws_utils/data_sync/operations.py index 122a7a8..d5a53af 100644 --- a/src/aibs_informatics_aws_utils/data_sync/operations.py +++ b/src/aibs_informatics_aws_utils/data_sync/operations.py @@ -27,10 +27,10 @@ ) from aibs_informatics_core.utils.logging import LoggingMixin, get_logger from aibs_informatics_core.utils.os_operations import find_all_paths +from botocore.client import Config from aibs_informatics_aws_utils.efs import get_local_path from aibs_informatics_aws_utils.s3 import ( - Config, TransferConfig, delete_s3_path, get_s3_path_stats, @@ -50,8 +50,8 @@ @functools.cache -def get_botocore_config(max_pool_connections: int) -> Config: - return Config(max_pool_connections=max_pool_connections) +def get_botocore_config(max_pool_connections: int, **kwargs) -> Config: + return Config(max_pool_connections=max_pool_connections, **kwargs) @dataclass