Skip to content

Commit

Permalink
Make FileDatastore to ignore datastore records in some cases.
Browse files Browse the repository at this point in the history
Datastore records in DatasetRef can become invalidated in some cases,
this is mostly true for our unit tests that do crazy things to test
consistency. This patch adds a flag parameter to some methods that
tells them to ignore ref-supplied datastore records.
  • Loading branch information
andy-slac committed Oct 11, 2023
1 parent 951b861 commit 1c5ee3e
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,18 @@ def addStoredItemInfo(
case _:
raise ValueError(f"Unknown insert mode of '{insert_mode}'")

def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
def getStoredItemsInfo(
self, ref: DatasetIdRef, ignore_datastore_records: bool = False
) -> list[StoredFileInfo]:
"""Retrieve information associated with files stored in this
`Datastore` associated with this dataset ref.
Parameters
----------
ref : `DatasetRef`
The dataset that is to be queried.
ignore_datastore_records : `bool`
If `True` then do not use datastore records stored in refs.
Returns
-------
Expand All @@ -428,7 +432,7 @@ def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
list if no matching datasets can be found.
"""
# Try to get them from the ref first.
if ref._datastore_records is not None:
if ref._datastore_records is not None and not ignore_datastore_records:
if (ref_records := ref._datastore_records.get(self._table.name)) is not None:
# Need to make sure they have correct type.
for record in ref_records:
Expand Down Expand Up @@ -480,14 +484,16 @@ def _register_datasets(
self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)

def _get_stored_records_associated_with_refs(
self, refs: Iterable[DatasetIdRef]
self, refs: Iterable[DatasetIdRef], ignore_datastore_records: bool = False
) -> dict[DatasetId, list[StoredFileInfo]]:
"""Retrieve all records associated with the provided refs.
Parameters
----------
refs : iterable of `DatasetIdRef`
The refs for which records are to be retrieved.
ignore_datastore_records : `bool`
If `True` then do not use datastore records stored in refs.
Returns
-------
Expand All @@ -499,7 +505,7 @@ def _get_stored_records_associated_with_refs(
records_by_ref: defaultdict[DatasetId, list[StoredFileInfo]] = defaultdict(list)
refs_with_no_records = []
for ref in refs:
if ref._datastore_records is None:
if ignore_datastore_records or ref._datastore_records is None:
refs_with_no_records.append(ref)
else:
if (ref_records := ref._datastore_records.get(self._table.name)) is not None:
Expand Down Expand Up @@ -569,14 +575,18 @@ def removeStoredItemInfo(self, ref: DatasetIdRef) -> None:
# tests that check that this method works, so we keep it for now.
self._table.delete(["dataset_id"], {"dataset_id": ref.id})

def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location, StoredFileInfo]]:
def _get_dataset_locations_info(
self, ref: DatasetIdRef, ignore_datastore_records: bool = False
) -> list[tuple[Location, StoredFileInfo]]:
r"""Find all the `Location`\ s of the requested dataset in the
`Datastore` and the associated stored file information.
Parameters
----------
ref : `DatasetRef`
Reference to the required `Dataset`.
ignore_datastore_records : `bool`
If `True` then do not use datastore records stored in refs.
Returns
-------
Expand All @@ -585,7 +595,7 @@ def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location,
stored information about each file and its formatter.
"""
# Get the file information (this will fail if no file)
records = self.getStoredItemsInfo(ref)
records = self.getStoredItemsInfo(ref, ignore_datastore_records)

# Use the path to determine the location -- we need to take
# into account absolute URIs in the datastore record
Expand Down Expand Up @@ -1518,7 +1528,9 @@ def knows(self, ref: DatasetRef) -> bool:
exists : `bool`
`True` if the dataset is known to the datastore.
"""
fileLocations = self._get_dataset_locations_info(ref)
# We cannot trust datastore records from ref, as many unit tests delete
# datasets and check their existence.
fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True)
if fileLocations:
return True
return False
Expand All @@ -1527,7 +1539,7 @@ def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
# Docstring inherited from the base class.

# The records themselves. Could be missing some entries.
records = self._get_stored_records_associated_with_refs(refs)
records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)

return {ref: ref.id in records for ref in refs}

Expand Down Expand Up @@ -1752,7 +1764,9 @@ def _mexists(
requested_ids = set(id_to_ref.keys())

# The records themselves. Could be missing some entries.
records = self._get_stored_records_associated_with_refs(id_to_ref.values())
records = self._get_stored_records_associated_with_refs(
id_to_ref.values(), ignore_datastore_records=True
)

dataset_existence = self._process_mexists_records(
id_to_ref, records, True, artifact_existence=artifact_existence
Expand Down Expand Up @@ -1848,7 +1862,9 @@ def exists(self, ref: DatasetRef) -> bool:
though it is present in the local cache.
"""
ref = self._cast_storage_class(ref)
fileLocations = self._get_dataset_locations_info(ref)
# We cannot trust datastore records from ref, as many unit tests delete
# datasets and check their existence.
fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True)

# if we are being asked to trust that registry might not be correct
# we ask for the expected locations and check them explicitly
Expand Down Expand Up @@ -2502,7 +2518,7 @@ def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = Tr

# Determine which datasets are known to datastore directly.
id_to_ref = {ref.id: ref for ref in refs}
existing_ids = self._get_stored_records_associated_with_refs(refs)
existing_ids = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)
existing_refs = {id_to_ref[ref_id] for ref_id in existing_ids}

missing = refs - existing_refs
Expand Down Expand Up @@ -2747,7 +2763,9 @@ def transfer_from(
# What we really want is all the records in the source datastore
# associated with these refs. Or derived ones if they don't exist
# in the source.
source_records = source_datastore._get_stored_records_associated_with_refs(refs)
source_records = source_datastore._get_stored_records_associated_with_refs(
refs, ignore_datastore_records=True
)

# The source dataset_ids are the keys in these records
source_ids = set(source_records)
Expand Down Expand Up @@ -2822,7 +2840,7 @@ def transfer_from(
source_records[missing].extend(dataset_records)

# See if we already have these records
target_records = self._get_stored_records_associated_with_refs(refs)
target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)

# The artifacts to register
artifacts = []
Expand Down

0 comments on commit 1c5ee3e

Please sign in to comment.