diff --git a/src/aibs_informatics_aws_utils/data_sync/operations.py b/src/aibs_informatics_aws_utils/data_sync/operations.py index 06789d7..c7f6542 100644 --- a/src/aibs_informatics_aws_utils/data_sync/operations.py +++ b/src/aibs_informatics_aws_utils/data_sync/operations.py @@ -57,7 +57,8 @@ def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI): destination_path=destination_path, transfer_config=self.s3_transfer_config, config=self.botocore_config, - force=False, + force=self.config.force, + size_only=self.config.size_only, delete=True, ) if not self.config.retain_source_data: @@ -93,7 +94,8 @@ def sync_paths_with_lock(*args, **kwargs): destination_path=destination_path, transfer_config=self.s3_transfer_config, config=self.botocore_config, - force=False, + force=self.config.force, + size_only=self.config.size_only, delete=True, ) @@ -131,7 +133,8 @@ def sync_s3_to_s3( source_path_prefix=source_path_prefix, transfer_config=self.s3_transfer_config, config=self.botocore_config, - force=False, + force=self.config.force, + size_only=self.config.size_only, delete=True, ) if not self.config.retain_source_data: @@ -195,21 +198,25 @@ def sync_data( max_concurrency: int = 10, retain_source_data: bool = True, require_lock: bool = False, + force: bool = False, + size_only: bool = False, ): request = DataSyncRequest( source_path=source_path, destination_path=destination_path, - source_path_prefix=source_path_prefix, + source_path_prefix=S3KeyPrefix(source_path_prefix) if source_path_prefix else None, max_concurrency=max_concurrency, retain_source_data=retain_source_data, require_lock=require_lock, + force=force, + size_only=size_only, ) return DataSyncOperations.sync_request(request=request) def refresh_local_path__mtime(path: Path, min_mtime: Union[int, float]): paths = find_all_paths(path, include_dirs=False, include_files=True) - for path in paths: - path_stats = os.stat(path) + for subpath in paths: + path_stats = os.stat(subpath) if path_stats.st_mtime < min_mtime: - os.utime(path, times=(path_stats.st_atime, min_mtime)) + os.utime(subpath, times=(path_stats.st_atime, min_mtime)) diff --git a/src/aibs_informatics_aws_utils/s3.py b/src/aibs_informatics_aws_utils/s3.py index c9f188b..f114619 100644 --- a/src/aibs_informatics_aws_utils/s3.py +++ b/src/aibs_informatics_aws_utils/s3.py @@ -374,7 +374,7 @@ def upload_folder( @retry(ResponseStreamingError) def upload_file( - local_path: Path, + local_path: Union[str, Path], s3_path: S3URI, extra_args: Optional[Dict[str, Any]] = None, transfer_config: Optional[TransferConfig] = None, @@ -384,7 +384,7 @@ def upload_file( ): s3_client = get_s3_client(**kwargs) if force or should_sync( - source_path=local_path, destination_path=s3_path, size_only=size_only, **kwargs + source_path=Path(local_path), destination_path=s3_path, size_only=size_only, **kwargs ): s3_client.upload_file( Filename=str(local_path), @@ -527,15 +527,19 @@ def get_s3_path_stats(s3_path: S3URI, **kwargs) -> S3PathStats: ) +# TODO: Two things +# 1. allow for a way to specify `size_only` for this function when transfering large number of files +# 2. add flag for failing if no source data exists. def sync_paths( source_path: Union[Path, S3URI], destination_path: Union[Path, S3URI], - source_path_prefix: str = None, + source_path_prefix: Optional[str] = None, include: Optional[List[Pattern]] = None, exclude: Optional[List[Pattern]] = None, extra_args: Optional[Dict[str, Any]] = None, transfer_config: Optional[TransferConfig] = None, force: bool = False, + size_only: bool = False, delete: bool = False, **kwargs, ) -> List[S3TransferResponse]: @@ -576,12 +580,17 @@ def sync_paths( destination_path=destination_path, source_path_prefix=source_path_prefix, extra_args=extra_args, - force=force, ) for nested_source_path in nested_source_paths ] - responses = process_transfer_requests(*requests, transfer_config=transfer_config, **kwargs) + responses = process_transfer_requests( + *requests, + transfer_config=transfer_config, + force=force, + size_only=size_only, + **kwargs, + ) if delete: logger.info(f"Sync: checking for files to delete following sync") @@ -613,7 +622,6 @@ def generate_transfer_request( destination_path: Union[Path, S3URI], source_path_prefix: Optional[str] = None, extra_args: Optional[Dict[str, Any]] = None, - force: bool = True, ) -> S3TransferRequest: """Create a S3 transfer request @@ -641,16 +649,16 @@ def generate_transfer_request( # This will be sanitized by S3URI class (removing double slashes) new_destination_path = S3URI(destination_path + relative_source_path) if isinstance(source_path, S3URI): - return S3CopyRequest(source_path, new_destination_path, force, extra_args=extra_args) + return S3CopyRequest(source_path, new_destination_path, extra_args=extra_args) else: - return S3UploadRequest(source_path, new_destination_path, force, extra_args=extra_args) + return S3UploadRequest(source_path, new_destination_path, extra_args=extra_args) elif isinstance(source_path, S3URI) and isinstance(destination_path, Path): local_destination_path: Path = ( Path(get_path_with_root(relative_source_path, destination_path)) if relative_source_path else destination_path ) - return S3DownloadRequest(source_path, local_destination_path, force) + return S3DownloadRequest(source_path, local_destination_path) else: raise ValueError("Local to local transfer is not ") @@ -658,9 +666,26 @@ def generate_transfer_request( def process_transfer_requests( *transfer_requests: S3TransferRequest, transfer_config: Optional[TransferConfig] = None, + force: bool = False, + size_only: bool = False, suppress_errors: bool = False, **kwargs, ) -> List[S3TransferResponse]: + """Process a list of S3 transfer requests + + Args: + transfer_config (Optional[TransferConfig]): transfer config. + Defaults to None. + force (bool): Whether to force the transfer. + Defaults to False. + size_only (bool): Whether to only check size when transferring. + Defaults to False. + suppress_errors (bool): Whether to suppress errors. + Defaults to False. + + Returns: + List[S3TransferResponse]: List of transfer responses + """ transfer_responses = [] for request in transfer_requests: @@ -671,7 +696,8 @@ def process_transfer_requests( destination_path=request.destination_path, extra_args=request.extra_args, transfer_config=transfer_config, - force=request.force, + force=force, + size_only=size_only, **kwargs, ) elif isinstance(request, S3UploadRequest): @@ -680,7 +706,8 @@ def process_transfer_requests( s3_path=request.destination_path, extra_args=request.extra_args, transfer_config=transfer_config, - force=request.force, + force=force, + size_only=size_only, **kwargs, ) elif isinstance(request, S3DownloadRequest): @@ -697,7 +724,8 @@ def process_transfer_requests( s3_path=request.source_path, local_path=request.destination_path, transfer_config=transfer_config, - force=request.force, + force=force, + size_only=size_only, **kwargs, ) transfer_responses.append(S3TransferResponse(request, False)) @@ -716,7 +744,12 @@ def process_transfer_requests( copy_s3_path = sync_paths -@retry(ClientError, [lambda ex: client_error_code_check(ex, "SlowDown")]) +client_error_code_check__SlowDown: Callable[[Exception], bool] = lambda ex: isinstance( + ex, ClientError +) and client_error_code_check(ex, "SlowDown") + + +@retry(ClientError, [client_error_code_check__SlowDown]) def copy_s3_object( source_path: S3URI, destination_path: S3URI, @@ -890,7 +923,7 @@ def list_s3_paths( s3 = get_s3_client(**kwargs) def match_results(value: str, patterns: List[Pattern]) -> List[bool]: - return [_.match(value) for _ in patterns] + return [_.match(value) is not None for _ in patterns] paginator = s3.get_paginator("list_objects_v2") @@ -1013,6 +1046,7 @@ def should_sync( dest_local_path, multipart_chunk_size_bytes, multipart_threshold_bytes ) else: + logger.warning(f"Destination path {destination_path} does not exist as a file or object.") return True if isinstance(source_path, S3URI) and is_object(source_path): @@ -1156,7 +1190,7 @@ def update_s3_storage_class( print( f"debug: current storage class: {s3_obj.storage_class}, target: {target_storage_class}" ) - current_storage_class = S3StorageClass.from_boto_s3_obj(s3_obj) + current_storage_class = S3StorageClass.from_boto_s3_obj(s3_obj) # type: ignore[arg-type] # Current storage class matches target: No-op if current_storage_class == target_storage_class: continue