Skip to content

Commit

Permalink
Deduplication of dataset {path}/{item_name}s pre-upload (#937)
Browse files Browse the repository at this point in the history
  • Loading branch information
JBWilkie authored Oct 10, 2024
1 parent 25aff99 commit acb371d
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 69 deletions.
55 changes: 53 additions & 2 deletions darwin/dataset/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
Tuple,
Dict,
)

from rich.console import Console
import requests

from darwin.datatypes import PathLike, Slot, SourceFile
from darwin.doc_enum import DocEnum
from darwin.path_utils import construct_full_path
from darwin.utils import chunk
from darwin.utils.utils import is_image_extension_allowed_by_filename, SLOTS_GRID_MAP
from darwin.importer.importer import _console_theme

if TYPE_CHECKING:
from darwin.client import Client
Expand Down Expand Up @@ -198,7 +199,7 @@ def serialize_darwin_json_v2(self):

@property
def full_path(self) -> str:
"""The full ``Path`` (with filename inclduded) to the file."""
"""The full ``Path`` (with filename inclduded) to the item."""
return construct_full_path(self.data["path"], self.data["filename"])


Expand Down Expand Up @@ -278,6 +279,11 @@ def serialize_darwin_json_v2(self):

return {"slots": slots, "layout": self.layout, "name": self.name, "path": "/"}

@property
def full_path(self) -> str:
"""The full ``Path`` (with filename included) to the item"""
return "/" + self.name


class FileMonitor(object):
"""
Expand Down Expand Up @@ -384,6 +390,7 @@ def __init__(
self.local_files = local_files
self.dataset: RemoteDataset = dataset
self.errors: List[UploadRequestError] = []
self.skip_existing_full_remote_filepaths()
self.blocked_items, self.pending_items = self._request_upload()

@staticmethod
Expand Down Expand Up @@ -425,6 +432,50 @@ def progress(self):
"""Current level of upload progress."""
return self._progress

def skip_existing_full_remote_filepaths(self) -> None:
"""
Checks if any items to be uploaded have duplicate {item_path}/{item_name} with
items already present in the remote dataset. Skip these files and display
a warning for each one.
"""
console = Console(theme=_console_theme())
full_remote_filepaths = [
Path(file.full_path) for file in self.dataset.fetch_remote_files()
]

multi_file_items_to_remove = []
local_files_to_remove = []

if self.multi_file_items:
for multi_file_item in self.multi_file_items:
if Path(multi_file_item.full_path) in full_remote_filepaths:
local_files_to_remove.extend(multi_file_item.files)
multi_file_items_to_remove.append(multi_file_item)
console.print(
f"The remote filepath {multi_file_item.full_path} is already occupied by a dataset item in the {self.dataset.slug} dataset. Skipping upload of item.",
style="warning",
)
if self.local_files:
for local_file in self.local_files:
if Path(local_file.full_path) in full_remote_filepaths:
local_files_to_remove.append(local_file)
console.print(
f"The remote filepath {local_file.full_path} already exists in the {self.dataset.slug} dataset. Skipping upload of item.",
style="warning",
)

self.local_files = [
local_file
for local_file in self.local_files
if local_file not in local_files_to_remove
]
if self.multi_file_items:
self.multi_file_items = [
multi_file_item
for multi_file_item in self.multi_file_items
if multi_file_item not in multi_file_items_to_remove
]

def prepare_upload(
self,
) -> Optional[Iterator[Callable[[Optional[ByteReadCallback]], None]]]:
Expand Down
114 changes: 60 additions & 54 deletions tests/darwin/cli_functions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,35 +112,38 @@ def test_default_non_verbose(
with patch.object(
Client, "get_remote_dataset", return_value=remote_dataset
) as get_remote_dataset_mock:
with patch.object(Console, "print", return_value=None) as print_mock:
upload_data(
f"{team_slug_darwin_json_v2}/{dataset_slug}",
["test_1.jpg", "test_2.jpg", "test_3.jpg"],
[],
0,
None,
False,
False,
False,
False,
)
get_remote_dataset_mock.assert_called_once()
with patch.object(remote_dataset, "fetch_remote_files", return_value=[]):
with patch.object(Console, "print", return_value=None) as print_mock:
upload_data(
f"{team_slug_darwin_json_v2}/{dataset_slug}",
["test_1.jpg", "test_2.jpg", "test_3.jpg"],
[],
0,
None,
False,
False,
False,
False,
)
get_remote_dataset_mock.assert_called_once()

assert (
call("Skipped 1 files already in the dataset.\n", style="warning")
in print_mock.call_args_list
)
assert (
call(
"2 files couldn't be uploaded because an error occurred.\n",
style="error",
assert (
call(
"Skipped 1 files already in the dataset.\n", style="warning"
)
in print_mock.call_args_list
)
assert (
call(
"2 files couldn't be uploaded because an error occurred.\n",
style="error",
)
in print_mock.call_args_list
)
assert (
call('Re-run with "--verbose" for further details')
in print_mock.call_args_list
)
in print_mock.call_args_list
)
assert (
call('Re-run with "--verbose" for further details')
in print_mock.call_args_list
)

@pytest.mark.usefixtures("file_read_write_test")
@responses.activate
Expand Down Expand Up @@ -215,35 +218,38 @@ def test_with_verbose_flag(
with patch.object(
Client, "get_remote_dataset", return_value=remote_dataset
) as get_remote_dataset_mock:
with patch.object(Console, "print", return_value=None) as print_mock:
upload_data(
f"{team_slug_darwin_json_v2}/{dataset_slug}",
["test_1.jpg", "test_2.jpg", "test_3.jpg"],
[],
0,
None,
None,
False,
False,
True,
)
get_remote_dataset_mock.assert_called_once()
with patch.object(remote_dataset, "fetch_remote_files", return_value=[]):
with patch.object(Console, "print", return_value=None) as print_mock:
upload_data(
f"{team_slug_darwin_json_v2}/{dataset_slug}",
["test_1.jpg", "test_2.jpg", "test_3.jpg"],
[],
0,
None,
None,
False,
False,
True,
)
get_remote_dataset_mock.assert_called_once()

assert (
call("Skipped 1 files already in the dataset.\n", style="warning")
in print_mock.call_args_list
)
assert (
call(
"2 files couldn't be uploaded because an error occurred.\n",
style="error",
assert (
call(
"Skipped 1 files already in the dataset.\n", style="warning"
)
in print_mock.call_args_list
)
assert (
call(
"2 files couldn't be uploaded because an error occurred.\n",
style="error",
)
in print_mock.call_args_list
)
assert (
call('Re-run with "--verbose" for further details')
not in print_mock.call_args_list
)
in print_mock.call_args_list
)
assert (
call('Re-run with "--verbose" for further details')
not in print_mock.call_args_list
)


class TestSetFileStatus:
Expand Down
16 changes: 10 additions & 6 deletions tests/darwin/dataset/remote_dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,15 +714,18 @@ def test_raises_if_both_as_frames_and_local_files_are_given(
remote_dataset.push([LocalFile("test.jpg")], as_frames=True)

def test_works_with_local_files_list(self, remote_dataset: RemoteDataset):
assert_upload_mocks_are_correctly_called(
remote_dataset, [LocalFile("test.jpg")]
)
with patch.object(remote_dataset, "fetch_remote_files", return_value=[]):
assert_upload_mocks_are_correctly_called(
remote_dataset, [LocalFile("test.jpg")]
)

def test_works_with_path_list(self, remote_dataset: RemoteDataset):
assert_upload_mocks_are_correctly_called(remote_dataset, [Path("test.jpg")])
with patch.object(remote_dataset, "fetch_remote_files", return_value=[]):
assert_upload_mocks_are_correctly_called(remote_dataset, [Path("test.jpg")])

def test_works_with_str_list(self, remote_dataset: RemoteDataset):
assert_upload_mocks_are_correctly_called(remote_dataset, ["test.jpg"])
with patch.object(remote_dataset, "fetch_remote_files", return_value=[]):
assert_upload_mocks_are_correctly_called(remote_dataset, ["test.jpg"])

def test_works_with_supported_files(self, remote_dataset: RemoteDataset):
supported_extensions = [
Expand All @@ -743,7 +746,8 @@ def test_works_with_supported_files(self, remote_dataset: RemoteDataset):
".ndpi",
]
filenames = [f"test{extension}" for extension in supported_extensions]
assert_upload_mocks_are_correctly_called(remote_dataset, filenames)
with patch.object(remote_dataset, "fetch_remote_files", return_value=[]):
assert_upload_mocks_are_correctly_called(remote_dataset, filenames)

def test_raises_with_unsupported_files(self, remote_dataset: RemoteDataset):
with pytest.raises(UnsupportedFileType):
Expand Down
74 changes: 67 additions & 7 deletions tests/darwin/dataset/upload_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from darwin.dataset.upload_manager import (
LocalFile,
UploadHandler,
UploadHandlerV2,
UploadStage,
_upload_chunk_size,
)
Expand Down Expand Up @@ -61,7 +62,8 @@ def dataset(
def test_request_upload_is_not_called_on_init(
dataset: RemoteDataset, request_upload_endpoint: str
):
upload_handler = UploadHandler.build(dataset, [])
with patch.object(dataset, "fetch_remote_files", return_value=[]):
upload_handler = UploadHandler.build(dataset, [])

assert upload_handler.pending_count == 0
assert upload_handler.blocked_count == 0
Expand Down Expand Up @@ -97,7 +99,8 @@ def test_pending_count_is_correct(dataset: RemoteDataset, request_upload_endpoin
responses.add(responses.POST, request_upload_endpoint, json=response, status=200)

local_file = LocalFile(local_path=Path("test.jpg"))
upload_handler = UploadHandler.build(dataset, [local_file])
with patch.object(dataset, "fetch_remote_files", return_value=[]):
upload_handler = UploadHandler.build(dataset, [local_file])

assert upload_handler.pending_count == 1
assert upload_handler.blocked_count == 0
Expand Down Expand Up @@ -139,7 +142,8 @@ def test_blocked_count_is_correct(dataset: RemoteDataset, request_upload_endpoin
responses.add(responses.POST, request_upload_endpoint, json=response, status=200)

local_file = LocalFile(local_path=Path("test.jpg"))
upload_handler = UploadHandler.build(dataset, [local_file])
with patch.object(dataset, "fetch_remote_files", return_value=[]):
upload_handler = UploadHandler.build(dataset, [local_file])

assert upload_handler.pending_count == 0
assert upload_handler.blocked_count == 1
Expand Down Expand Up @@ -193,7 +197,8 @@ def test_error_count_is_correct_on_signature_request(
responses.add(responses.GET, sign_upload_endpoint, status=500)

local_file = LocalFile(local_path=Path("test.jpg"))
upload_handler = UploadHandler.build(dataset, [local_file])
with patch.object(dataset, "fetch_remote_files", return_value=[]):
upload_handler = UploadHandler.build(dataset, [local_file])

upload_handler.upload()
for file_to_upload in upload_handler.progress:
Expand Down Expand Up @@ -259,7 +264,8 @@ def test_error_count_is_correct_on_upload_to_s3(

Path("test.jpg").touch()
local_file = LocalFile(local_path=Path("test.jpg"))
upload_handler = UploadHandler.build(dataset, [local_file])
with patch.object(dataset, "fetch_remote_files", return_value=[]):
upload_handler = UploadHandler.build(dataset, [local_file])

upload_handler.upload()
for file_to_upload in upload_handler.progress:
Expand Down Expand Up @@ -326,7 +332,8 @@ def test_error_count_is_correct_on_confirm_upload(

Path("test.jpg").touch()
local_file = LocalFile(local_path=Path("test.jpg"))
upload_handler = UploadHandler.build(dataset, [local_file])
with patch.object(dataset, "fetch_remote_files", return_value=[]):
upload_handler = UploadHandler.build(dataset, [local_file])

upload_handler.upload()
for file_to_upload in upload_handler.progress:
Expand Down Expand Up @@ -391,7 +398,8 @@ def test_upload_files(dataset: RemoteDataset, request_upload_endpoint: str):

Path("test.jpg").touch()
local_file = LocalFile(local_path=Path("test.jpg"))
upload_handler = UploadHandler.build(dataset, [local_file])
with patch.object(dataset, "fetch_remote_files", return_value=[]):
upload_handler = UploadHandler.build(dataset, [local_file])

upload_handler.upload()
for file_to_upload in upload_handler.progress:
Expand All @@ -418,3 +426,55 @@ def test_default_value_when_env_var_is_not_integer(self, mock: MagicMock):
def test_value_specified_by_env_var(self, mock: MagicMock):
assert _upload_chunk_size() == 123
mock.assert_called_once_with("DARWIN_UPLOAD_CHUNK_SIZE")


def test_skip_existing_full_remote_filepaths_with_local_files():
mock_dataset = MagicMock()
mock_dataset.fetch_remote_files.return_value = [
MagicMock(full_path="/existing_file_1.jpg"),
MagicMock(full_path="/existing_file_2.jpg"),
]
mock_dataset.slug = "test-dataset"

local_file_1 = MagicMock(full_path="/existing_file_1.jpg")
local_file_2 = MagicMock(full_path="/new_file.jpg")

with patch("darwin.dataset.upload_manager.Console.print") as mock_print:
upload_handler = UploadHandlerV2(mock_dataset, [local_file_1, local_file_2], [])

assert local_file_1 not in upload_handler.local_files
assert local_file_2 in upload_handler.local_files

mock_print.assert_any_call(
"The remote filepath /existing_file_1.jpg already exists in the test-dataset dataset. Skipping upload of item.",
style="warning",
)


def test_skip_existing_full_remote_filepaths_with_multi_file_items():
mock_dataset = MagicMock()
mock_dataset.fetch_remote_files.return_value = [
MagicMock(full_path="/existing_multi_file_item.jpg"),
]
mock_dataset.slug = "test-dataset"

multi_file_item_1 = MagicMock(
full_path="/existing_multi_file_item.jpg", files=[MagicMock()]
)
multi_file_item_2 = MagicMock(
full_path="/new_multi_file_item.jpg", files=[MagicMock()]
)

with patch("darwin.dataset.upload_manager.Console.print") as mock_print:
upload_handler = UploadHandlerV2(
mock_dataset, [], [multi_file_item_1, multi_file_item_2]
)

assert multi_file_item_1 not in upload_handler.multi_file_items
assert multi_file_item_2 in upload_handler.multi_file_items

# Verify that the correct warning was printed
mock_print.assert_any_call(
"The remote filepath /existing_multi_file_item.jpg is already occupied by a dataset item in the test-dataset dataset. Skipping upload of item.",
style="warning",
)
Loading

0 comments on commit acb371d

Please sign in to comment.