Skip to content

Commit

Permalink
OCSDV-343: Resolve lingering part files when S3 -> EFS transfers fail
Browse files Browse the repository at this point in the history
This commit changes the behavior of the `DataSyncOperations.sync_s3_to_local()`
method so that when it is downloading from a discrete
s3 object -> local filesystem (usually EFS), it will download the
s3 object to a custom temporary location on the same file system
as the destination and then `os.rename` to the final destination path.

When Boto3 downloads an s3 object, it will create a temporary version
of the file in the SAME parent directory as the destination path
(e.g. "/file_system/dst/path/input_data.fastq.gz" will result in
a temporary file at
"/file_system/dst/path/input_data.fastq.gz.{unique_hash}")

This default behavior by Boto3 is normally okay, but becomes problematic
when two specific things happen:

1. A data sync is interrupted in a way that Boto3 doesn't have time to
   clean up the temporary file

   - Subsequent runs of data sync also can't clean up the file because
     our data sync logic only knows about syncing the single object and
     has no knowledge about the lingering temporary file (from a
     previous sync attempt)

2. A scientific executable that we need to support (e.g. cellranger)
   is excessively greedy when looking for putative FASTQ input files and
   will even grab partial files with names like `*.fastq.gz.6eF5b5da`.

By saving the partial files during the Boto3 object download process in a
different location and only atomically moving files which have completed
transfer we can avoid this situation.
  • Loading branch information
njmei committed Nov 7, 2024
1 parent 48344bb commit 8630cd0
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 15 deletions.
72 changes: 57 additions & 15 deletions src/aibs_informatics_aws_utils/data_sync/operations.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import functools
import os
import tempfile
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Union, cast

from aibs_informatics_core.models.aws.efs import EFSPath
from aibs_informatics_core.models.aws.s3 import S3URI, S3KeyPrefix
from aibs_informatics_core.models.data_sync import DataSyncConfig, DataSyncRequest, DataSyncTask
from aibs_informatics_core.models.data_sync import (
DataSyncConfig,
DataSyncRequest,
DataSyncTask,
RemoteToLocalConfig,
)
from aibs_informatics_core.utils.decorators import retry
from aibs_informatics_core.utils.file_operations import (
CannotAcquirePathLockError,
PathLock,
copy_path,
find_filesystem_boundary,
move_path,
remove_path,
)
Expand Down Expand Up @@ -80,8 +87,10 @@ def sync_local_to_s3(self, source_path: LocalPath, destination_path: S3URI):
def sync_s3_to_local(self, source_path: S3URI, destination_path: LocalPath):
self.logger.info(f"Downloading s3 content from {source_path} -> {destination_path}")
start_time = datetime.now(tz=timezone.utc)
destination_path = self.sanitize_local_path(destination_path)
source_is_object = is_object(source_path)

if not is_object(source_path) and not is_folder(source_path):
if not source_is_object and not is_folder(source_path):
message = f"S3 path {source_path} does not exist as object or folder"
if self.config.fail_if_missing:
raise FileNotFoundError(message)
Expand All @@ -107,17 +116,48 @@ def sync_paths_with_lock(*args, **kwargs):

_sync_paths = sync_paths_with_lock

destination_path = self.sanitize_local_path(destination_path)

_sync_paths(
source_path=source_path,
destination_path=destination_path,
transfer_config=self.s3_transfer_config,
config=self.botocore_config,
force=self.config.force,
size_only=self.config.size_only,
delete=True,
)
remote_to_local_config = self.config.remote_to_local_config
if source_is_object and remote_to_local_config.use_custom_tmp_dir:
# If our source is an s3 object (not prefix) and we want to use custom object
# download logic (default True), then we save s3 objects to a temporary location
# that is on the SAME file system.
#
# This is necessary because if the normal boto3 download gets interrupted in a
# catastrophic way that prevents built-in cleanup strategies, it leaves
# a 'partial' file (e.g. `*.6eF5b5da`) that resides in the SAME parent directory
# as the actual intended destination path. This 'partial' file can be picked up by
# some scientific executables (e.g. cellranger) and interpreted as an invalid input
if remote_to_local_config.custom_tmp_dir is None:
custom_tmp_dir = find_filesystem_boundary(destination_path)
elif isinstance(remote_to_local_config.custom_tmp_dir, EFSPath):
custom_tmp_dir = self.sanitize_local_path(remote_to_local_config.custom_tmp_dir)
else:
custom_tmp_dir = remote_to_local_config.custom_tmp_dir

with tempfile.TemporaryDirectory(dir=custom_tmp_dir) as tmp_dir:
tmp_destination_path = Path(tmp_dir) / destination_path.name
_sync_paths(
source_path=source_path,
destination_path=tmp_destination_path,
transfer_config=self.s3_transfer_config,
config=self.botocore_config,
force=self.config.force,
size_only=self.config.size_only,
)
destination_path.parent.mkdir(parents=True, exist_ok=True)
os.rename(src=tmp_destination_path, dst=destination_path)
else:
# If our source is a prefix, then _sync_paths has builtin logic to deal with deleting
# excess files in the destination dir that do not match the source prefix layout.
_sync_paths(
source_path=source_path,
destination_path=destination_path,
transfer_config=self.s3_transfer_config,
config=self.botocore_config,
force=self.config.force,
size_only=self.config.size_only,
delete=True,
)

self.logger.info(f"Updating last modified time on local files to at least {start_time}")
refresh_local_path__mtime(destination_path, start_time.timestamp())
Expand Down Expand Up @@ -221,8 +261,8 @@ def sanitize_local_path(self, path: Union[EFSPath, Path]) -> Path:

@classmethod
def sync_request(cls, request: DataSyncRequest):
sync_operations = cls(request)
sync_operations.sync_task(task=request)
sync_operations = cls(config=request.config)
sync_operations.sync_task(task=request.task)


# We should consider using cloudpathlib[s3] in the future
Expand All @@ -236,6 +276,7 @@ def sync_data(
force: bool = False,
size_only: bool = False,
fail_if_missing: bool = True,
remote_to_local_config: Optional[RemoteToLocalConfig] = None,
):
request = DataSyncRequest(
source_path=source_path,
Expand All @@ -247,6 +288,7 @@ def sync_data(
force=force,
size_only=size_only,
fail_if_missing=fail_if_missing,
remote_to_local_config=remote_to_local_config or RemoteToLocalConfig(),
)
return DataSyncOperations.sync_request(request=request)

Expand Down
28 changes: 28 additions & 0 deletions test/aibs_informatics_aws_utils/data_sync/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import moto
from aibs_informatics_core.models.aws.s3 import S3URI
from aibs_informatics_core.models.data_sync import RemoteToLocalConfig
from aibs_informatics_core.utils.os_operations import find_all_paths

from aibs_informatics_aws_utils.data_sync.operations import sync_data
Expand Down Expand Up @@ -268,6 +269,33 @@ def test__sync_data__s3_to_local__file__does_not_exist(self):
)
assert not destination_path.exists()

def test__sync_data__s3_to_local__file__auto_custom_tmp_dir__succeeds(self):
fs = self.setUpLocalFS()
self.setUpBucket()
source_path = self.put_object("source", "hello")
destination_path = fs / "destination"
sync_data(
source_path=source_path,
destination_path=destination_path,
remote_to_local_config=RemoteToLocalConfig(use_custom_tmp_dir=True),
)
self.assertPathsEqual(source_path, destination_path, 1)

def test__sync_data__s3_to_local__file__specified_custom_tmp_dir__succeeds(self):
fs = self.setUpLocalFS()
self.setUpBucket()
source_path = self.put_object("source", "hello")
destination_path = fs / "destination"
sync_data(
source_path=source_path,
destination_path=destination_path,
remote_to_local_config=RemoteToLocalConfig(
use_custom_tmp_dir=True,
custom_tmp_dir=fs,
),
)
self.assertPathsEqual(source_path, destination_path, 1)

def test__sync_data__local_to_s3__folder__succeeds(self):
fs = self.setUpLocalFS()
self.setUpBucket()
Expand Down

0 comments on commit 8630cd0

Please sign in to comment.