Skip to content

Commit

Permalink
Merge pull request #18 from AllenInstitute/bugfix/data-sync-memory-leak
Browse files Browse the repository at this point in the history
Data Sync Operation updates - mem leak fix and introduction to results response
  • Loading branch information
rpmcginty authored Nov 12, 2024
2 parents 3346348 + b40d6a8 commit 1e50004
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 26 deletions.
93 changes: 72 additions & 21 deletions src/aibs_informatics_aws_utils/data_sync/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from aibs_informatics_core.models.data_sync import (
DataSyncConfig,
DataSyncRequest,
DataSyncResult,
DataSyncTask,
RemoteToLocalConfig,
)
Expand All @@ -20,17 +21,19 @@
PathLock,
copy_path,
find_filesystem_boundary,
get_path_size_bytes,
move_path,
remove_path,
)
from aibs_informatics_core.utils.logging import LoggingMixin, get_logger
from aibs_informatics_core.utils.os_operations import find_all_paths
from botocore.client import Config

from aibs_informatics_aws_utils.efs import get_local_path
from aibs_informatics_aws_utils.s3 import (
Config,
TransferConfig,
delete_s3_path,
get_s3_path_stats,
is_folder,
is_object,
sync_paths,
Expand All @@ -46,6 +49,11 @@
LocalPath = Union[Path, EFSPath]


@functools.cache
def get_botocore_config(max_pool_connections: int, **kwargs) -> Config:
return Config(max_pool_connections=max_pool_connections, **kwargs)


@dataclass
class DataSyncOperations(LoggingMixin):
config: DataSyncConfig
Expand All @@ -56,15 +64,18 @@ def s3_transfer_config(self) -> TransferConfig:

@property
def botocore_config(self) -> Config:
return Config(max_pool_connections=self.config.max_concurrency)
return get_botocore_config(max_pool_connections=self.config.max_concurrency)

def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI):
def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI) -> DataSyncResult:
source_path = self.sanitize_local_path(source_path)
if not source_path.exists():
if self.config.fail_if_missing:
raise FileNotFoundError(f"Local path {source_path} does not exist")
self.logger.warning(f"Local path {source_path} does not exist")
return
if self.config.include_detailed_response:
return DataSyncResult(bytes_transferred=0, files_transferred=0)
else:
return DataSyncResult()
if source_path.is_dir():
self.logger.info("local source path is folder. Adding suffix to destination path")
destination_path = S3URI.build(
Expand All @@ -81,10 +92,15 @@ def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI):
size_only=self.config.size_only,
delete=True,
)
result = DataSyncResult()
if self.config.include_detailed_response:
result.files_transferred = len(find_all_paths(source_path, include_dirs=False))
result.bytes_transferred = get_path_size_bytes(source_path)
if not self.config.retain_source_data:
remove_path(source_path)
return result

def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath):
def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath) -> DataSyncResult:
self.logger.info(f"Downloading s3 content from {source_path} -> {destination_path}")
start_time = datetime.now(tz=timezone.utc)
destination_path = self.sanitize_local_path(destination_path)
Expand All @@ -95,7 +111,10 @@ def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath):
if self.config.fail_if_missing:
raise FileNotFoundError(message)
self.logger.warning(message)
return
if self.config.include_detailed_response:
return DataSyncResult(bytes_transferred=0, files_transferred=0)
else:
return DataSyncResult()

_sync_paths = sync_paths

Expand Down Expand Up @@ -168,7 +187,16 @@ def sync_paths_with_lock(*args, **kwargs):
"Deleting s3 objects not allowed when downloading them to local file system"
)

def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPath):
result = DataSyncResult()
# Collecting stats for detailed response
if self.config.include_detailed_response:
result.files_transferred = len(find_all_paths(destination_path, include_dirs=False))
result.bytes_transferred = get_path_size_bytes(destination_path)
return result

def sync_local_to_local(
self, source_path: LocalPath, destination_path: LocalPath
) -> DataSyncResult:
source_path = self.sanitize_local_path(source_path)
destination_path = self.sanitize_local_path(destination_path)
self.logger.info(f"Copying local content from {source_path} -> {destination_path}")
Expand All @@ -178,7 +206,7 @@ def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPat
if self.config.fail_if_missing:
raise FileNotFoundError(f"Local path {source_path} does not exist")
self.logger.warning(f"Local path {source_path} does not exist")
return
return DataSyncResult(bytes_transferred=0)

if self.config.retain_source_data:
copy_path(source_path=source_path, destination_path=destination_path, exists_ok=True)
Expand All @@ -187,20 +215,30 @@ def sync_local_to_local(self, source_path: LocalPath, destination_path: LocalPat
self.logger.info(f"Updating last modified time on local files to at least {start_time}")
refresh_local_path__mtime(destination_path, start_time.timestamp())

result = DataSyncResult()
# Collecting stats for detailed response
if self.config.include_detailed_response:
result.files_transferred = len(find_all_paths(source_path, include_dirs=False))
result.bytes_transferred = get_path_size_bytes(source_path)
return result

def sync_s3_to_s3(
self,
source_path: S3URI,
destination_path: S3URI,
source_path_prefix: Optional[S3KeyPrefix] = None,
):
) -> DataSyncResult:
self.logger.info(f"Syncing s3 content from {source_path} -> {destination_path}")

if not is_object(source_path) and not is_folder(source_path):
message = f"S3 path {source_path} does not exist as object or folder"
if self.config.fail_if_missing:
raise FileNotFoundError(message)
self.logger.warning(message)
return
if self.config.include_detailed_response:
return DataSyncResult(bytes_transferred=0, files_transferred=0)
else:
return DataSyncResult()

sync_paths(
source_path=source_path,
Expand All @@ -215,42 +253,58 @@ def sync_s3_to_s3(
if not self.config.retain_source_data:
delete_s3_path(s3_path=source_path)

result = DataSyncResult()
if self.config.include_detailed_response:
path_stats = get_s3_path_stats(destination_path)
result.files_transferred = path_stats.object_count
result.bytes_transferred = path_stats.size_bytes
return result

def sync(
self,
source_path: Union[LocalPath, S3URI],
destination_path: Union[LocalPath, S3URI],
source_path_prefix: Optional[str] = None,
):
) -> DataSyncResult:
if isinstance(source_path, S3URI) and isinstance(destination_path, S3URI):
self.sync_s3_to_s3(
return self.sync_s3_to_s3(
source_path=source_path,
destination_path=destination_path,
source_path_prefix=S3KeyPrefix(source_path_prefix) if source_path_prefix else None,
)

elif isinstance(source_path, S3URI):
self.sync_s3_to_local(
return self.sync_s3_to_local(
source_path=source_path,
destination_path=cast(LocalPath, destination_path),
)
elif isinstance(destination_path, S3URI):
self.sync_local_to_s3(
return self.sync_local_to_s3(
source_path=cast(LocalPath, source_path),
destination_path=destination_path,
)
else:
self.sync_local_to_local(
return self.sync_local_to_local(
source_path=source_path,
destination_path=destination_path,
)

def sync_task(self, task: DataSyncTask):
def sync_task(self, task: DataSyncTask) -> DataSyncResult:
return self.sync(
source_path=task.source_path,
destination_path=task.destination_path,
source_path_prefix=task.source_path_prefix,
)

@classmethod
def sync_request(cls, request: DataSyncRequest) -> DataSyncResult:
sync_operations = cls(config=request.config)
return sync_operations.sync_task(task=request.task)

# -----------------------------------
# Helper methods
# -----------------------------------

def sanitize_local_path(self, path: Union[EFSPath, Path]) -> Path:
if isinstance(path, EFSPath):
self.logger.info(f"Sanitizing efs path {path}")
Expand All @@ -259,11 +313,6 @@ def sanitize_local_path(self, path: Union[EFSPath, Path]) -> Path:
return new_path
return path

@classmethod
def sync_request(cls, request: DataSyncRequest):
sync_operations = cls(config=request.config)
sync_operations.sync_task(task=request.task)


# We should consider using cloudpathlib[s3] in the future
def sync_data(
Expand All @@ -277,6 +326,7 @@ def sync_data(
size_only: bool = False,
fail_if_missing: bool = True,
remote_to_local_config: Optional[RemoteToLocalConfig] = None,
include_detailed_response: bool = False,
):
request = DataSyncRequest(
source_path=source_path,
Expand All @@ -289,6 +339,7 @@ def sync_data(
size_only=size_only,
fail_if_missing=fail_if_missing,
remote_to_local_config=remote_to_local_config or RemoteToLocalConfig(),
include_detailed_response=include_detailed_response,
)
return DataSyncOperations.sync_request(request=request)

Expand Down
Loading

0 comments on commit 1e50004

Please sign in to comment.