From 1c5ee3e5c3122c23e17c0fed01c14ac6c8a9c837 Mon Sep 17 00:00:00 2001 From: Andy Salnikov Date: Wed, 11 Oct 2023 09:39:09 -0700 Subject: [PATCH] Make FileDatastore to ignore datastore records in some cases. Datastore records in DatasetRef can become invalidated in some cases, this is mostly true for our unit tests that do crazy things to test consistency. This patch adds a flag parameter to some methods that tells them to ignore ref-supplied datastore records. --- .../daf/butler/datastores/fileDatastore.py | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/python/lsst/daf/butler/datastores/fileDatastore.py b/python/lsst/daf/butler/datastores/fileDatastore.py index 58a6dc1a4e..81b50e3488 100644 --- a/python/lsst/daf/butler/datastores/fileDatastore.py +++ b/python/lsst/daf/butler/datastores/fileDatastore.py @@ -410,7 +410,9 @@ def addStoredItemInfo( case _: raise ValueError(f"Unknown insert mode of '{insert_mode}'") - def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]: + def getStoredItemsInfo( + self, ref: DatasetIdRef, ignore_datastore_records: bool = False + ) -> list[StoredFileInfo]: """Retrieve information associated with files stored in this `Datastore` associated with this dataset ref. @@ -418,6 +420,8 @@ def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]: ---------- ref : `DatasetRef` The dataset that is to be queried. + ignore_datastore_records : `bool` + If `True` then do not use datastore records stored in refs. Returns ------- @@ -428,7 +432,7 @@ def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]: list if no matching datasets can be found. """ # Try to get them from the ref first. - if ref._datastore_records is not None: + if ref._datastore_records is not None and not ignore_datastore_records: if (ref_records := ref._datastore_records.get(self._table.name)) is not None: # Need to make sure they have correct type. for record in ref_records: @@ -480,7 +484,7 @@ def _register_datasets( self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode) def _get_stored_records_associated_with_refs( - self, refs: Iterable[DatasetIdRef] + self, refs: Iterable[DatasetIdRef], ignore_datastore_records: bool = False ) -> dict[DatasetId, list[StoredFileInfo]]: """Retrieve all records associated with the provided refs. @@ -488,6 +492,8 @@ def _get_stored_records_associated_with_refs( ---------- refs : iterable of `DatasetIdRef` The refs for which records are to be retrieved. + ignore_datastore_records : `bool` + If `True` then do not use datastore records stored in refs. Returns ------- @@ -499,7 +505,7 @@ def _get_stored_records_associated_with_refs( records_by_ref: defaultdict[DatasetId, list[StoredFileInfo]] = defaultdict(list) refs_with_no_records = [] for ref in refs: - if ref._datastore_records is None: + if ignore_datastore_records or ref._datastore_records is None: refs_with_no_records.append(ref) else: if (ref_records := ref._datastore_records.get(self._table.name)) is not None: @@ -569,7 +575,9 @@ def removeStoredItemInfo(self, ref: DatasetIdRef) -> None: # tests that check that this method works, so we keep it for now. self._table.delete(["dataset_id"], {"dataset_id": ref.id}) - def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location, StoredFileInfo]]: + def _get_dataset_locations_info( + self, ref: DatasetIdRef, ignore_datastore_records: bool = False + ) -> list[tuple[Location, StoredFileInfo]]: r"""Find all the `Location`\ s of the requested dataset in the `Datastore` and the associated stored file information. @@ -577,6 +585,8 @@ def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location, ---------- ref : `DatasetRef` Reference to the required `Dataset`. + ignore_datastore_records : `bool` + If `True` then do not use datastore records stored in refs. Returns ------- @@ -585,7 +595,7 @@ def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location, stored information about each file and its formatter. """ # Get the file information (this will fail if no file) - records = self.getStoredItemsInfo(ref) + records = self.getStoredItemsInfo(ref, ignore_datastore_records) # Use the path to determine the location -- we need to take # into account absolute URIs in the datastore record @@ -1518,7 +1528,9 @@ def knows(self, ref: DatasetRef) -> bool: exists : `bool` `True` if the dataset is known to the datastore. """ - fileLocations = self._get_dataset_locations_info(ref) + # We cannot trust datastore records from ref, as many unit tests delete + # datasets and check their existence. + fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True) if fileLocations: return True return False @@ -1527,7 +1539,7 @@ def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]: # Docstring inherited from the base class. # The records themselves. Could be missing some entries. - records = self._get_stored_records_associated_with_refs(refs) + records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) return {ref: ref.id in records for ref in refs} @@ -1752,7 +1764,9 @@ def _mexists( requested_ids = set(id_to_ref.keys()) # The records themselves. Could be missing some entries. - records = self._get_stored_records_associated_with_refs(id_to_ref.values()) + records = self._get_stored_records_associated_with_refs( + id_to_ref.values(), ignore_datastore_records=True + ) dataset_existence = self._process_mexists_records( id_to_ref, records, True, artifact_existence=artifact_existence @@ -1848,7 +1862,9 @@ def exists(self, ref: DatasetRef) -> bool: though it is present in the local cache. """ ref = self._cast_storage_class(ref) - fileLocations = self._get_dataset_locations_info(ref) + # We cannot trust datastore records from ref, as many unit tests delete + # datasets and check their existence. + fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True) # if we are being asked to trust that registry might not be correct # we ask for the expected locations and check them explicitly @@ -2502,7 +2518,7 @@ def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = Tr # Determine which datasets are known to datastore directly. id_to_ref = {ref.id: ref for ref in refs} - existing_ids = self._get_stored_records_associated_with_refs(refs) + existing_ids = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) existing_refs = {id_to_ref[ref_id] for ref_id in existing_ids} missing = refs - existing_refs @@ -2747,7 +2763,9 @@ def transfer_from( # What we really want is all the records in the source datastore # associated with these refs. Or derived ones if they don't exist # in the source. - source_records = source_datastore._get_stored_records_associated_with_refs(refs) + source_records = source_datastore._get_stored_records_associated_with_refs( + refs, ignore_datastore_records=True + ) # The source dataset_ids are the keys in these records source_ids = set(source_records) @@ -2822,7 +2840,7 @@ def transfer_from( source_records[missing].extend(dataset_records) # See if we already have these records - target_records = self._get_stored_records_associated_with_refs(refs) + target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) # The artifacts to register artifacts = []