Skip to content

Commit

Permalink
Merge pull request #4 from AllenInstitute/feature/add-size-only-field…
Browse files Browse the repository at this point in the history
…-to-s3-sync-paths

expose and use size-only in data sync s3 sync paths
  • Loading branch information
rpmcginty authored Jun 12, 2024
2 parents d9e5c8f + 13573e1 commit 71c8204
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 22 deletions.
21 changes: 14 additions & 7 deletions src/aibs_informatics_aws_utils/data_sync/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI):
destination_path=destination_path,
transfer_config=self.s3_transfer_config,
config=self.botocore_config,
force=False,
force=self.config.force,
size_only=self.config.size_only,
delete=True,
)
if not self.config.retain_source_data:
Expand Down Expand Up @@ -93,7 +94,8 @@ def sync_paths_with_lock(*args, **kwargs):
destination_path=destination_path,
transfer_config=self.s3_transfer_config,
config=self.botocore_config,
force=False,
force=self.config.force,
size_only=self.config.size_only,
delete=True,
)

Expand Down Expand Up @@ -131,7 +133,8 @@ def sync_s3_to_s3(
source_path_prefix=source_path_prefix,
transfer_config=self.s3_transfer_config,
config=self.botocore_config,
force=False,
force=self.config.force,
size_only=self.config.size_only,
delete=True,
)
if not self.config.retain_source_data:
Expand Down Expand Up @@ -195,21 +198,25 @@ def sync_data(
max_concurrency: int = 10,
retain_source_data: bool = True,
require_lock: bool = False,
force: bool = False,
size_only: bool = False,
):
request = DataSyncRequest(
source_path=source_path,
destination_path=destination_path,
source_path_prefix=source_path_prefix,
source_path_prefix=S3KeyPrefix(source_path_prefix) if source_path_prefix else None,
max_concurrency=max_concurrency,
retain_source_data=retain_source_data,
require_lock=require_lock,
force=force,
size_only=size_only,
)
return DataSyncOperations.sync_request(request=request)


def refresh_local_path__mtime(path: Path, min_mtime: Union[int, float]):
paths = find_all_paths(path, include_dirs=False, include_files=True)
for path in paths:
path_stats = os.stat(path)
for subpath in paths:
path_stats = os.stat(subpath)
if path_stats.st_mtime < min_mtime:
os.utime(path, times=(path_stats.st_atime, min_mtime))
os.utime(subpath, times=(path_stats.st_atime, min_mtime))
64 changes: 49 additions & 15 deletions src/aibs_informatics_aws_utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def upload_folder(

@retry(ResponseStreamingError)
def upload_file(
local_path: Path,
local_path: Union[str, Path],
s3_path: S3URI,
extra_args: Optional[Dict[str, Any]] = None,
transfer_config: Optional[TransferConfig] = None,
Expand All @@ -384,7 +384,7 @@ def upload_file(
):
s3_client = get_s3_client(**kwargs)
if force or should_sync(
source_path=local_path, destination_path=s3_path, size_only=size_only, **kwargs
source_path=Path(local_path), destination_path=s3_path, size_only=size_only, **kwargs
):
s3_client.upload_file(
Filename=str(local_path),
Expand Down Expand Up @@ -527,15 +527,19 @@ def get_s3_path_stats(s3_path: S3URI, **kwargs) -> S3PathStats:
)


# TODO: Two things
# 1. allow for a way to specify `size_only` for this function when transfering large number of files
# 2. add flag for failing if no source data exists.
def sync_paths(
source_path: Union[Path, S3URI],
destination_path: Union[Path, S3URI],
source_path_prefix: str = None,
source_path_prefix: Optional[str] = None,
include: Optional[List[Pattern]] = None,
exclude: Optional[List[Pattern]] = None,
extra_args: Optional[Dict[str, Any]] = None,
transfer_config: Optional[TransferConfig] = None,
force: bool = False,
size_only: bool = False,
delete: bool = False,
**kwargs,
) -> List[S3TransferResponse]:
Expand Down Expand Up @@ -576,12 +580,17 @@ def sync_paths(
destination_path=destination_path,
source_path_prefix=source_path_prefix,
extra_args=extra_args,
force=force,
)
for nested_source_path in nested_source_paths
]

responses = process_transfer_requests(*requests, transfer_config=transfer_config, **kwargs)
responses = process_transfer_requests(
*requests,
transfer_config=transfer_config,
force=force,
size_only=size_only,
**kwargs,
)

if delete:
logger.info(f"Sync: checking for files to delete following sync")
Expand Down Expand Up @@ -613,7 +622,6 @@ def generate_transfer_request(
destination_path: Union[Path, S3URI],
source_path_prefix: Optional[str] = None,
extra_args: Optional[Dict[str, Any]] = None,
force: bool = True,
) -> S3TransferRequest:
"""Create a S3 transfer request
Expand Down Expand Up @@ -641,26 +649,43 @@ def generate_transfer_request(
# This will be sanitized by S3URI class (removing double slashes)
new_destination_path = S3URI(destination_path + relative_source_path)
if isinstance(source_path, S3URI):
return S3CopyRequest(source_path, new_destination_path, force, extra_args=extra_args)
return S3CopyRequest(source_path, new_destination_path, extra_args=extra_args)
else:
return S3UploadRequest(source_path, new_destination_path, force, extra_args=extra_args)
return S3UploadRequest(source_path, new_destination_path, extra_args=extra_args)
elif isinstance(source_path, S3URI) and isinstance(destination_path, Path):
local_destination_path: Path = (
Path(get_path_with_root(relative_source_path, destination_path))
if relative_source_path
else destination_path
)
return S3DownloadRequest(source_path, local_destination_path, force)
return S3DownloadRequest(source_path, local_destination_path)
else:
raise ValueError("Local to local transfer is not ")


def process_transfer_requests(
*transfer_requests: S3TransferRequest,
transfer_config: Optional[TransferConfig] = None,
force: bool = False,
size_only: bool = False,
suppress_errors: bool = False,
**kwargs,
) -> List[S3TransferResponse]:
"""Process a list of S3 transfer requests
Args:
transfer_config (Optional[TransferConfig]): transfer config.
Defaults to None.
force (bool): Whether to force the transfer.
Defaults to False.
size_only (bool): Whether to only check size when transferring.
Defaults to False.
suppress_errors (bool): Whether to suppress errors.
Defaults to False.
Returns:
List[S3TransferResponse]: List of transfer responses
"""
transfer_responses = []

for request in transfer_requests:
Expand All @@ -671,7 +696,8 @@ def process_transfer_requests(
destination_path=request.destination_path,
extra_args=request.extra_args,
transfer_config=transfer_config,
force=request.force,
force=force,
size_only=size_only,
**kwargs,
)
elif isinstance(request, S3UploadRequest):
Expand All @@ -680,7 +706,8 @@ def process_transfer_requests(
s3_path=request.destination_path,
extra_args=request.extra_args,
transfer_config=transfer_config,
force=request.force,
force=force,
size_only=size_only,
**kwargs,
)
elif isinstance(request, S3DownloadRequest):
Expand All @@ -697,7 +724,8 @@ def process_transfer_requests(
s3_path=request.source_path,
local_path=request.destination_path,
transfer_config=transfer_config,
force=request.force,
force=force,
size_only=size_only,
**kwargs,
)
transfer_responses.append(S3TransferResponse(request, False))
Expand All @@ -716,7 +744,12 @@ def process_transfer_requests(
copy_s3_path = sync_paths


@retry(ClientError, [lambda ex: client_error_code_check(ex, "SlowDown")])
client_error_code_check__SlowDown: Callable[[Exception], bool] = lambda ex: isinstance(
ex, ClientError
) and client_error_code_check(ex, "SlowDown")


@retry(ClientError, [client_error_code_check__SlowDown])
def copy_s3_object(
source_path: S3URI,
destination_path: S3URI,
Expand Down Expand Up @@ -890,7 +923,7 @@ def list_s3_paths(
s3 = get_s3_client(**kwargs)

def match_results(value: str, patterns: List[Pattern]) -> List[bool]:
return [_.match(value) for _ in patterns]
return [_.match(value) is not None for _ in patterns]

paginator = s3.get_paginator("list_objects_v2")

Expand Down Expand Up @@ -1013,6 +1046,7 @@ def should_sync(
dest_local_path, multipart_chunk_size_bytes, multipart_threshold_bytes
)
else:
logger.warning(f"Destination path {destination_path} does not exist as a file or object.")
return True

if isinstance(source_path, S3URI) and is_object(source_path):
Expand Down Expand Up @@ -1156,7 +1190,7 @@ def update_s3_storage_class(
print(
f"debug: current storage class: {s3_obj.storage_class}, target: {target_storage_class}"
)
current_storage_class = S3StorageClass.from_boto_s3_obj(s3_obj)
current_storage_class = S3StorageClass.from_boto_s3_obj(s3_obj) # type: ignore[arg-type]
# Current storage class matches target: No-op
if current_storage_class == target_storage_class:
continue
Expand Down

0 comments on commit 71c8204

Please sign in to comment.