diff --git a/python/lsst/daf/butler/_butler.py b/python/lsst/daf/butler/_butler.py index a8110f6c15..a8a3e0a18d 100644 --- a/python/lsst/daf/butler/_butler.py +++ b/python/lsst/daf/butler/_butler.py @@ -237,6 +237,10 @@ def __init__( self._datastore = Datastore.fromConfig( self._config, self._registry.getDatastoreBridgeManager(), butlerRoot=butlerRoot ) + # TODO: Once datastore drops dependency on registry we can + # construct datastore first and pass opaque tables to registry + # constructor. + self._registry.make_datastore_tables(self._datastore.get_opaque_table_definitions()) self.storageClasses = StorageClassFactory() self.storageClasses.addFromConfig(self._config) except Exception: @@ -1004,6 +1008,7 @@ def _findDatasetRef( collections: Any = None, predict: bool = False, run: str | None = None, + datastore_records: bool = False, **kwargs: Any, ) -> DatasetRef: """Shared logic for methods that start with a search for a dataset in @@ -1029,6 +1034,8 @@ def _findDatasetRef( run : `str`, optional Run collection name to use for creating `DatasetRef` for predicted datasets. Only used if ``predict`` is `True`. + datastore_records : `bool`, optional + If `True` add datastore records to returned `DatasetRef`. **kwargs Additional keyword arguments used to augment or construct a `DataId`. See `DataId` parameters. @@ -1055,6 +1062,9 @@ def _findDatasetRef( if isinstance(datasetRefOrType, DatasetRef): if collections is not None: warnings.warn("Collections should not be specified with DatasetRef", stacklevel=3) + # May need to retrieve datastore records if requested. + if datastore_records and datasetRefOrType._datastore_records is None: + datasetRefOrType = self._registry.get_datastore_records(datasetRefOrType) return datasetRefOrType timespan: Timespan | None = None @@ -1081,7 +1091,13 @@ def _findDatasetRef( ) # Always lookup the DatasetRef, even if one is given, to ensure it is # present in the current collection. - ref = self._registry.findDataset(datasetType, dataId, collections=collections, timespan=timespan) + ref = self._registry.findDataset( + datasetType, + dataId, + collections=collections, + timespan=timespan, + datastore_records=datastore_records, + ) if ref is None: if predict: if run is None: @@ -1102,7 +1118,9 @@ def _findDatasetRef( # registry definition. The DatasetRef must therefore be recreated # using the user definition such that the expected type is # returned. - ref = DatasetRef(datasetType, ref.dataId, run=ref.run, id=ref.id) + ref = DatasetRef( + datasetType, ref.dataId, run=ref.run, id=ref.id, datastore_records=ref._datastore_records + ) return ref @@ -1419,7 +1437,9 @@ def get( ``exposure`` is a temporal dimension. """ log.debug("Butler get: %s, dataId=%s, parameters=%s", datasetRefOrType, dataId, parameters) - ref = self._findDatasetRef(datasetRefOrType, dataId, collections=collections, **kwargs) + ref = self._findDatasetRef( + datasetRefOrType, dataId, collections=collections, datastore_records=True, **kwargs + ) return self._datastore.get(ref, parameters=parameters, storageClass=storageClass) def getURIs( diff --git a/python/lsst/daf/butler/_dataset_ref.py b/python/lsst/daf/butler/_dataset_ref.py index 32c75fa9c3..5b247d9186 100644 --- a/python/lsst/daf/butler/_dataset_ref.py +++ b/python/lsst/daf/butler/_dataset_ref.py @@ -28,6 +28,7 @@ __all__ = [ "AmbiguousDatasetError", + "DatasetDatastoreRecords", "DatasetId", "DatasetIdFactory", "DatasetIdGenEnum", @@ -38,8 +39,8 @@ import enum import sys import uuid -from collections.abc import Iterable -from typing import TYPE_CHECKING, Any, ClassVar, Protocol, TypeAlias, runtime_checkable +from collections.abc import Iterable, Mapping +from typing import TYPE_CHECKING, Any, ClassVar, Literal, Protocol, TypeAlias, runtime_checkable import pydantic from lsst.daf.butler._compat import PYDANTIC_V2, _BaseModelCompat @@ -49,6 +50,7 @@ from ._config_support import LookupKey from ._dataset_type import DatasetType, SerializedDatasetType from ._named import NamedKeyDict +from .datastore.stored_file_info import StoredDatastoreItemInfo from .dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate from .json import from_json_pydantic, to_json_pydantic from .persistence_context import PersistenceContextVars @@ -57,6 +59,10 @@ from ._storage_class import StorageClass from .registry import Registry +# Per-dataset records grouped by opaque table name, usually there is just one +# opaque table. +DatasetDatastoreRecords: TypeAlias = Mapping[str, Iterable[StoredDatastoreItemInfo]] + class AmbiguousDatasetError(Exception): """Raised when a `DatasetRef` is not resolved but should be. @@ -176,6 +182,10 @@ def makeDatasetId( # This is constant, so don't recreate a set for each instance _serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"} +# Serialized representation of StoredDatastoreItemInfo collection, first item +# is the record class name. +_DatastoreRecords: TypeAlias = tuple[str, list[Mapping[str, Any]]] + class SerializedDatasetRef(_BaseModelCompat): """Simplified model of a `DatasetRef` suitable for serialization.""" @@ -185,6 +195,8 @@ class SerializedDatasetRef(_BaseModelCompat): dataId: SerializedDataCoordinate | None = None run: StrictStr | None = None component: StrictStr | None = None + datastore_records: Mapping[str, _DatastoreRecords] | None = None + """Maps opaque table name to datastore records.""" if PYDANTIC_V2: # Can not use "after" validator since in some cases the validator @@ -225,6 +237,7 @@ def direct( datasetType: dict[str, Any] | None = None, dataId: dict[str, Any] | None = None, component: str | None = None, + datastore_records: Mapping[str, _DatastoreRecords] | None = None, ) -> SerializedDatasetRef: """Construct a `SerializedDatasetRef` directly without validators. @@ -251,6 +264,7 @@ def direct( dataId=serialized_dataId, run=sys.intern(run), component=component, + datastore_records=datastore_records, ) return node @@ -306,6 +320,7 @@ class DatasetRef: "datasetType", "dataId", "run", + "_datastore_records", ) def __init__( @@ -317,6 +332,7 @@ def __init__( id: DatasetId | None = None, conform: bool = True, id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, + datastore_records: DatasetDatastoreRecords | None = None, ): self.datasetType = datasetType if conform: @@ -332,6 +348,7 @@ def __init__( .makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode) .int ) + self._datastore_records = datastore_records @property def id(self) -> DatasetId: @@ -413,11 +430,19 @@ def to_simple(self, minimal: bool = False) -> SerializedDatasetRef: simple["component"] = self.datasetType.component() return SerializedDatasetRef(**simple) + datastore_records: Mapping[str, _DatastoreRecords] | None = None + if self._datastore_records is not None: + datastore_records = {} + for opaque_name, records in self._datastore_records.items(): + class_name, record_dicts = StoredDatastoreItemInfo.to_records(records) + datastore_records[opaque_name] = class_name, list(record_dicts) + return SerializedDatasetRef( datasetType=self.datasetType.to_simple(minimal=minimal), dataId=self.dataId.to_simple(), run=self.run, id=self.id, + datastore_records=datastore_records, ) @classmethod @@ -512,7 +537,21 @@ def from_simple( f"Encountered with {simple!r}{dstr}." ) - newRef = cls(datasetType, dataId, id=simple.id, run=simple.run) + # rebuild datastore records + datastore_records: DatasetDatastoreRecords | None = None + if simple.datastore_records is not None: + datastore_records = {} + for opaque_name, (class_name, records) in simple.datastore_records.items(): + infos = StoredDatastoreItemInfo.from_records(class_name, records) + datastore_records[opaque_name] = infos + + newRef = cls( + datasetType, + dataId, + id=simple.id, + run=simple.run, + datastore_records=datastore_records, + ) if cache is not None: cache[key] = newRef return newRef @@ -527,16 +566,20 @@ def _unpickle( dataId: DataCoordinate, id: DatasetId, run: str, + datastore_records: DatasetDatastoreRecords | None, ) -> DatasetRef: """Create new `DatasetRef`. A custom factory method for use by `__reduce__` as a workaround for its lack of support for keyword arguments. """ - return cls(datasetType, dataId, id=id, run=run) + return cls(datasetType, dataId, id=id, run=run, datastore_records=datastore_records) def __reduce__(self) -> tuple: - return (self._unpickle, (self.datasetType, self.dataId, self.id, self.run)) + return ( + self._unpickle, + (self.datasetType, self.dataId, self.id, self.run, self._datastore_records), + ) def __deepcopy__(self, memo: dict) -> DatasetRef: # DatasetRef is recursively immutable; see note in @immutable @@ -559,7 +602,12 @@ def expanded(self, dataId: DataCoordinate) -> DatasetRef: """ assert dataId == self.dataId return DatasetRef( - datasetType=self.datasetType, dataId=dataId, id=self.id, run=self.run, conform=False + datasetType=self.datasetType, + dataId=dataId, + id=self.id, + run=self.run, + conform=False, + datastore_records=self._datastore_records, ) def isComponent(self) -> bool: @@ -669,7 +717,12 @@ def makeCompositeRef(self) -> DatasetRef: # Assume that the data ID does not need to be standardized # and should match whatever this ref already has. return DatasetRef( - self.datasetType.makeCompositeDatasetType(), self.dataId, id=self.id, run=self.run, conform=False + self.datasetType.makeCompositeDatasetType(), + self.dataId, + id=self.id, + run=self.run, + conform=False, + datastore_records=self._datastore_records, ) def makeComponentRef(self, name: str) -> DatasetRef: @@ -695,6 +748,7 @@ def makeComponentRef(self, name: str) -> DatasetRef: id=self.id, run=self.run, conform=False, + datastore_records=self._datastore_records, ) def overrideStorageClass(self, storageClass: str | StorageClass) -> DatasetRef: @@ -720,6 +774,7 @@ def replace( id: DatasetId | None = None, run: str | None = None, storage_class: str | StorageClass | None = None, + datastore_records: DatasetDatastoreRecords | None | Literal[False] = False, ) -> DatasetRef: """Create a new `DatasetRef` from this one, but with some modified attributes. @@ -734,12 +789,17 @@ def replace( storage_class : `str` or `StorageClass` or `None`. The new storage class. If not `None`, replaces existing storage class. + datastore_records : `DatasetDatastoreRecords` or `None` + New datastore records. If `None` remove all records. By default + datastore records are preserved. Returns ------- modified : `DatasetRef` A new dataset reference with updated attributes. """ + if datastore_records is False: + datastore_records = self._datastore_records if storage_class is None: datasetType = self.datasetType else: @@ -755,6 +815,7 @@ def replace( run=run, id=id, conform=False, + datastore_records=datastore_records, ) def is_compatible_with(self, ref: DatasetRef) -> bool: @@ -821,3 +882,9 @@ class associated with the dataset type of the other ref can be Cannot be changed after a `DatasetRef` is constructed. """ + + datastore_records: DatasetDatastoreRecords | None + """Optional datastore records (`DatasetDatastoreRecords`). + + Cannot be changed after a `DatasetRef` is constructed. + """ diff --git a/python/lsst/daf/butler/_formatter.py b/python/lsst/daf/butler/_formatter.py index 49529c4956..b9f122fc01 100644 --- a/python/lsst/daf/butler/_formatter.py +++ b/python/lsst/daf/butler/_formatter.py @@ -34,29 +34,28 @@ import logging from abc import ABCMeta, abstractmethod from collections.abc import Iterator, Mapping, Set -from typing import TYPE_CHECKING, Any, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias from lsst.utils.introspection import get_full_type_name from ._config import Config from ._config_support import LookupKey, processLookupConfigs -from ._dataset_ref import DatasetRef -from ._dataset_type import DatasetType from ._file_descriptor import FileDescriptor from ._location import Location -from ._storage_class import StorageClass from .dimensions import DimensionUniverse from .mapping_factory import MappingFactory log = logging.getLogger(__name__) -# Define a new special type for functions that take "entity" -Entity = DatasetType | DatasetRef | StorageClass | str - - if TYPE_CHECKING: + from ._dataset_ref import DatasetRef + from ._dataset_type import DatasetType + from ._storage_class import StorageClass from .dimensions import DataCoordinate + # Define a new special type for functions that take "entity" + Entity: TypeAlias = DatasetType | DatasetRef | StorageClass | str + class Formatter(metaclass=ABCMeta): """Interface for reading and writing Datasets. diff --git a/python/lsst/daf/butler/datastore/_datastore.py b/python/lsst/daf/butler/datastore/_datastore.py index 5b2567ec2b..f0226643f3 100644 --- a/python/lsst/daf/butler/datastore/_datastore.py +++ b/python/lsst/daf/butler/datastore/_datastore.py @@ -30,10 +30,11 @@ from __future__ import annotations __all__ = ( - "DatastoreConfig", + "DatasetRefURIs", "Datastore", + "DatastoreConfig", + "DatastoreOpaqueTable", "DatastoreValidationError", - "DatasetRefURIs", "NullDatastore", "DatastoreTransaction", ) @@ -58,12 +59,14 @@ if TYPE_CHECKING: from lsst.resources import ResourcePath, ResourcePathExpression + from .. import ddl from .._config_support import LookupKey from .._dataset_ref import DatasetRef from .._dataset_type import DatasetType from .._storage_class import StorageClass from ..registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager from .record_data import DatastoreRecordData + from .stored_file_info import StoredDatastoreItemInfo _LOG = logging.getLogger(__name__) @@ -93,6 +96,19 @@ class Event: kwargs: dict +@dataclasses.dataclass(frozen=True) +class DatastoreOpaqueTable: + """Definition of the opaque table which stores datastore records. + + Table definition contains `.ddl.TableSpec` for a table and a class + of a record which must be a subclass of `StoredDatastoreItemInfo`. + """ + + __slots__ = {"table_spec", "record_class"} + table_spec: ddl.TableSpec + record_class: type[StoredDatastoreItemInfo] + + class IngestPrepData: """A helper base class for `Datastore` ingest implementations. @@ -537,6 +553,26 @@ def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None: """ raise NotImplementedError("Must be implemented by subclass") + @abstractmethod + def put_new(self, in_memory_dataset: Any, dataset_ref: DatasetRef) -> Mapping[str, DatasetRef]: + """Write a `InMemoryDataset` with a given `DatasetRef` to the store. + + Parameters + ---------- + inMemoryDataset : `object` + The Dataset to store. + datasetRef : `DatasetRef` + Reference to the associated Dataset. + + Returns + ------- + datastore_refs : `~collections.abc.Mapping` [`str`, `DatasetRef`] + Mapping of a datastore name to dataset reference stored in that + datastore, reference will include datastore records. Only + non-ephemeral datastores will appear in this mapping. + """ + raise NotImplementedError("Must be implemented by subclass") + def _overrideTransferMode(self, *datasets: FileDataset, transfer: str | None = None) -> str | None: """Allow ingest transfer mode to be defaulted based on datasets. @@ -1227,6 +1263,19 @@ def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | """ pass + @abstractmethod + def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: + """Make definitions of the opaque tables used by this Datastore. + + Returns + ------- + tables : `~collections.abc.Mapping` [ `str`, `.ddl.TableSpec` ] + Mapping of opaque table names to their definitions. This can be an + empty mapping if Datastore does not use opaque tables to keep + datastore records. + """ + raise NotImplementedError() + class NullDatastore(Datastore): """A datastore that implements the `Datastore` API but always fails when @@ -1268,6 +1317,9 @@ def get( def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None: raise NotImplementedError("This is a no-op datastore that can not access a real datastore") + def put_new(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> Mapping[str, DatasetRef]: + raise NotImplementedError("This is a no-op datastore that can not access a real datastore") + def ingest( self, *datasets: FileDataset, transfer: str | None = None, record_validation_info: bool = True ) -> None: @@ -1345,3 +1397,6 @@ def export_records( refs: Iterable[DatasetIdRef], ) -> Mapping[str, DatastoreRecordData]: raise NotImplementedError("This is a no-op datastore that can not access a real datastore") + + def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: + return {} diff --git a/python/lsst/daf/butler/datastore/generic_base.py b/python/lsst/daf/butler/datastore/generic_base.py index 02f32133a8..9fe2c71f96 100644 --- a/python/lsst/daf/butler/datastore/generic_base.py +++ b/python/lsst/daf/butler/datastore/generic_base.py @@ -32,136 +32,35 @@ __all__ = ("GenericBaseDatastore",) import logging -from abc import abstractmethod -from collections.abc import Iterable, Mapping, Sequence -from typing import TYPE_CHECKING, Any +from collections.abc import Mapping +from typing import TYPE_CHECKING, Any, Generic, TypeVar from .._exceptions import DatasetTypeNotSupportedError -from ..registry.interfaces import DatabaseInsertMode, DatastoreRegistryBridge -from ._datastore import Datastore +from ..datastore._datastore import Datastore +from .stored_file_info import StoredDatastoreItemInfo if TYPE_CHECKING: from .._dataset_ref import DatasetRef from .._storage_class import StorageClass - from .stored_file_info import StoredDatastoreItemInfo log = logging.getLogger(__name__) +_InfoType = TypeVar("_InfoType", bound=StoredDatastoreItemInfo) -class GenericBaseDatastore(Datastore): + +class GenericBaseDatastore(Datastore, Generic[_InfoType]): """Methods useful for most implementations of a `Datastore`. Should always be sub-classed since key abstract methods are missing. """ - @property - @abstractmethod - def bridge(self) -> DatastoreRegistryBridge: - """Object that manages the interface between this `Datastore` and the - `Registry` (`DatastoreRegistryBridge`). - """ - raise NotImplementedError() - - @abstractmethod - def addStoredItemInfo( - self, - refs: Iterable[DatasetRef], - infos: Iterable[Any], - insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, - ) -> None: - """Record internal storage information associated with one or more - datasets. - - Parameters - ---------- - refs : sequence of `DatasetRef` - The datasets that have been stored. - infos : sequence of `StoredDatastoreItemInfo` - Metadata associated with the stored datasets. - insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode` - Mode to use to insert the new records into the table. The - options are ``INSERT`` (error if pre-existing), ``REPLACE`` - (replace content with new values), and ``ENSURE`` (skip if the row - already exists). - """ - raise NotImplementedError() - - @abstractmethod - def getStoredItemsInfo(self, ref: DatasetRef) -> Sequence[Any]: - """Retrieve information associated with files stored in this - `Datastore` associated with this dataset ref. - - Parameters - ---------- - ref : `DatasetRef` - The dataset that is to be queried. - - Returns - ------- - items : `list` [`StoredDatastoreItemInfo`] - Stored information about the files and associated formatters - associated with this dataset. Only one file will be returned - if the dataset has not been disassembled. Can return an empty - list if no matching datasets can be found. - """ - raise NotImplementedError() - - @abstractmethod - def removeStoredItemInfo(self, ref: DatasetRef) -> None: - """Remove information about the file associated with this dataset. - - Parameters - ---------- - ref : `DatasetRef` - The dataset that has been removed. - """ - raise NotImplementedError() - - def _register_datasets( - self, - refsAndInfos: Iterable[tuple[DatasetRef, StoredDatastoreItemInfo]], - insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, - ) -> None: - """Update registry to indicate that one or more datasets have been - stored. - - Parameters - ---------- - refsAndInfos : sequence `tuple` [`DatasetRef`, - `StoredDatastoreItemInfo`] - Datasets to register and the internal datastore metadata associated - with them. - insert_mode : `str`, optional - Indicate whether the new records should be new ("insert", default), - or allowed to exists ("ensure") or be replaced if already present - ("replace"). - """ - expandedRefs: list[DatasetRef] = [] - expandedItemInfos = [] - - for ref, itemInfo in refsAndInfos: - expandedRefs.append(ref) - expandedItemInfos.append(itemInfo) - - # Dataset location only cares about registry ID so if we have - # disassembled in datastore we have to deduplicate. Since they - # will have different datasetTypes we can't use a set - registryRefs = {r.id: r for r in expandedRefs} - if insert_mode == DatabaseInsertMode.INSERT: - self.bridge.insert(registryRefs.values()) - else: - # There are only two columns and all that matters is the - # dataset ID. - self.bridge.ensure(registryRefs.values()) - self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode) - def _post_process_get( self, - inMemoryDataset: Any, + inMemoryDataset: object, readStorageClass: StorageClass, assemblerParams: Mapping[str, Any] | None = None, isComponent: bool = False, - ) -> Any: + ) -> object: """Given the Python object read from the datastore, manipulate it based on the supplied parameters and ensure the Python type is correct. @@ -177,6 +76,11 @@ def _post_process_get( Parameters to pass to the assembler. Can be `None`. isComponent : `bool`, optional If this is a component, allow the inMemoryDataset to be `None`. + + Returns + ------- + dataset : `object` + In-memory dataset, potentially converted to expected type. """ # Process any left over parameters if assemblerParams: @@ -198,7 +102,7 @@ def _post_process_get( return inMemoryDataset - def _validate_put_parameters(self, inMemoryDataset: Any, ref: DatasetRef) -> None: + def _validate_put_parameters(self, inMemoryDataset: object, ref: DatasetRef) -> None: """Validate the supplied arguments for put. Parameters @@ -265,7 +169,6 @@ def transfer(self, inputDatastore: Datastore, ref: DatasetRef) -> None: The external `Datastore` from which to retreive the Dataset. ref : `DatasetRef` Reference to the required dataset in the input data store. - """ assert inputDatastore is not self # unless we want it for renames? inMemoryDataset = inputDatastore.get(ref) diff --git a/python/lsst/daf/butler/datastore/record_data.py b/python/lsst/daf/butler/datastore/record_data.py index 0badc73d4d..64bd6b88e5 100644 --- a/python/lsst/daf/butler/datastore/record_data.py +++ b/python/lsst/daf/butler/datastore/record_data.py @@ -37,8 +37,6 @@ from typing import TYPE_CHECKING, Any, TypeAlias from lsst.daf.butler._compat import PYDANTIC_V2, _BaseModelCompat -from lsst.utils import doImportType -from lsst.utils.introspection import get_full_type_name from .._dataset_ref import DatasetId from ..dimensions import DimensionUniverse @@ -63,15 +61,17 @@ class SerializedDatastoreRecordData(_BaseModelCompat): dataset_ids: list[uuid.UUID] """List of dataset IDs""" - records: Mapping[str, Mapping[str, list[_Record]]] - """List of records indexed by record class name and table name.""" + records: Mapping[str, Mapping[str, Mapping[str, list[_Record]]]] + """List of records indexed by record class name, dataset ID (encoded as + str, because JSON), and opaque table name. + """ @classmethod def direct( cls, *, dataset_ids: list[str | uuid.UUID], - records: dict[str, dict[str, list[_Record]]], + records: dict[str, dict[str, dict[str, list[_Record]]]], ) -> SerializedDatastoreRecordData: """Construct a `SerializedDatastoreRecordData` directly without validators. @@ -83,15 +83,6 @@ def direct( This method should only be called when the inputs are trusted. """ - # See also comments in record_ids_to_uuid() - for table_data in records.values(): - for table_records in table_data.values(): - for record in table_records: - # This only checks dataset_id value, if there are any other - # columns that are UUIDs we'd need more generic approach. - if (id := record.get("dataset_id")) is not None: - record["dataset_id"] = uuid.UUID(id) if isinstance(id, str) else id - data = cls.model_construct( _fields_set={"dataset_ids", "records"}, # JSON makes strings out of UUIDs, need to convert them back @@ -182,26 +173,13 @@ def to_simple(self, minimal: bool = False) -> SerializedDatastoreRecordData: simple : `dict` Representation of this instance as a simple dictionary. """ - - def _class_name(records: list[StoredDatastoreItemInfo]) -> str: - """Get fully qualified class name for the records. Empty string - returned if list is empty. Exception is raised if records are of - different classes. - """ - if not records: - return "" - classes = {record.__class__ for record in records} - assert len(classes) == 1, f"Records have to be of the same class: {classes}" - return get_full_type_name(classes.pop()) - - records: dict[str, dict[str, list[_Record]]] = {} - for table_data in self.records.values(): + records: dict[str, dict[str, dict[str, list[_Record]]]] = {} + for dataset_id, table_data in self.records.items(): for table_name, table_records in table_data.items(): - class_name = _class_name(table_records) + class_name, infos = StoredDatastoreItemInfo.to_records(table_records) class_records = records.setdefault(class_name, {}) - class_records.setdefault(table_name, []).extend( - [record.to_record() for record in table_records] - ) + dataset_records = class_records.setdefault(dataset_id.hex, {}) + dataset_records.setdefault(table_name, []).extend(dict(info) for info in infos) return SerializedDatastoreRecordData(dataset_ids=list(self.records.keys()), records=records) @classmethod @@ -238,29 +216,18 @@ def from_simple( # have records. for dataset_id in simple.dataset_ids: records[dataset_id] = {} - for class_name, table_data in simple.records.items(): - try: - klass = doImportType(class_name) - except ImportError: - # Prior to DM-41043 we were embedding a lsst.daf.butler.core - # path in the serialized form, which we never wanted; fix this - # one case. - if class_name == "lsst.daf.butler.core.storedFileInfo.StoredFileInfo": - from .stored_file_info import StoredFileInfo - - klass = StoredFileInfo - else: - raise - if not issubclass(klass, StoredDatastoreItemInfo): - raise RuntimeError( - "The class specified in the SerializedDatastoreRecordData " - f"({get_full_type_name(klass)}) is not a StoredDatastoreItemInfo." - ) - for table_name, table_records in table_data.items(): - for record in table_records: - info = klass.from_record(record) - dataset_type_records = records.setdefault(info.dataset_id, {}) - dataset_type_records.setdefault(table_name, []).append(info) + for class_name, class_data in simple.records.items(): + for dataset_id_str, dataset_data in class_data.items(): + for table_name, table_records in dataset_data.items(): + try: + infos = StoredDatastoreItemInfo.from_records(class_name, table_records) + except TypeError as exc: + raise RuntimeError( + "The class specified in the SerializedDatastoreRecordData " + f"({class_name}) is not a StoredDatastoreItemInfo." + ) from exc + dataset_records = records.setdefault(uuid.UUID(dataset_id_str), {}) + dataset_records.setdefault(table_name, []).extend(infos) newRecord = cls(records=records) if cache is not None: cache[key] = newRecord diff --git a/python/lsst/daf/butler/datastore/stored_file_info.py b/python/lsst/daf/butler/datastore/stored_file_info.py index a38ff6742d..ab1572d901 100644 --- a/python/lsst/daf/butler/datastore/stored_file_info.py +++ b/python/lsst/daf/butler/datastore/stored_file_info.py @@ -30,18 +30,20 @@ __all__ = ("StoredDatastoreItemInfo", "StoredFileInfo") import inspect -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from dataclasses import dataclass from typing import TYPE_CHECKING, Any from lsst.resources import ResourcePath +from lsst.utils import doImportType +from lsst.utils.introspection import get_full_type_name from .._formatter import Formatter, FormatterParameter from .._location import Location, LocationFactory from .._storage_class import StorageClass, StorageClassFactory if TYPE_CHECKING: - from .. import DatasetId, DatasetRef + from .._dataset_ref import DatasetRef # String to use when a Python None is encountered NULLSTR = "__NULL_STRING__" @@ -87,13 +89,14 @@ def from_record(cls: type[StoredDatastoreItemInfo], record: Mapping[str, Any]) - """ raise NotImplementedError() - def to_record(self) -> dict[str, Any]: - """Convert record contents to a dictionary.""" - raise NotImplementedError() + def to_record(self, **kwargs: Any) -> dict[str, Any]: + """Convert record contents to a dictionary. - @property - def dataset_id(self) -> DatasetId: - """Dataset ID associated with this record (`DatasetId`).""" + Parameters + ---------- + **kwargs + Additional items to add to returned record. + """ raise NotImplementedError() def update(self, **kwargs: Any) -> StoredDatastoreItemInfo: @@ -102,13 +105,75 @@ def update(self, **kwargs: Any) -> StoredDatastoreItemInfo: """ raise NotImplementedError() + @classmethod + def to_records( + cls, records: Iterable[StoredDatastoreItemInfo], **kwargs: Any + ) -> tuple[str, Iterable[Mapping[str, Any]]]: + """Convert a collection of records to dictionaries. + + Parameters + ---------- + records : `~collections.abc.Iterable` [ `StoredDatastoreItemInfo` ] + A collection of records, all records must be of the same type. + **kwargs + Additional items to add to each returned record. + + Returns + ------- + class_name : `str` + Name of the record class. + records : `list` [ `dict` ] + Records in their dictionary representation. + """ + if not records: + return "", [] + classes = {record.__class__ for record in records} + assert len(classes) == 1, f"Records have to be of the same class: {classes}" + return get_full_type_name(classes.pop()), [record.to_record(**kwargs) for record in records] + + @classmethod + def from_records( + cls, class_name: str, records: Iterable[Mapping[str, Any]] + ) -> list[StoredDatastoreItemInfo]: + """Convert collection of dictionaries to records. + + Parameters + ---------- + class_name : `str` + Name of the record class. + records : `~collections.abc.Iterable` [ `dict` ] + Records in their dictionary representation. -@dataclass(frozen=True) + Returns + ------- + infos : `list` [`StoredDatastoreItemInfo`] + Sequence of records converted to typed representation. + + Raises + ------ + TypeError + Raised if ``class_name`` is not a sub-class of + `StoredDatastoreItemInfo`. + """ + try: + klass = doImportType(class_name) + except ImportError: + # Prior to DM-41043 we were embedding a lsst.daf.butler.core + # path in the serialized form, which we never wanted; fix this + # one case. + if class_name == "lsst.daf.butler.core.storedFileInfo.StoredFileInfo": + klass = StoredFileInfo + else: + raise + if not issubclass(klass, StoredDatastoreItemInfo): + raise TypeError(f"Class {class_name} is not a subclass of StoredDatastoreItemInfo") + return [klass.from_record(record) for record in records] + + +@dataclass(frozen=True, slots=True) class StoredFileInfo(StoredDatastoreItemInfo): """Datastore-private metadata associated with a Datastore file.""" - __slots__ = {"formatter", "path", "storageClass", "component", "checksum", "file_size", "dataset_id"} - storageClassFactory = StorageClassFactory() def __init__( @@ -119,7 +184,6 @@ def __init__( component: str | None, checksum: str | None, file_size: int, - dataset_id: DatasetId, ): # Use these shenanigans to allow us to use a frozen dataclass object.__setattr__(self, "path", path) @@ -127,7 +191,6 @@ def __init__( object.__setattr__(self, "component", component) object.__setattr__(self, "checksum", checksum) object.__setattr__(self, "file_size", file_size) - object.__setattr__(self, "dataset_id", dataset_id) if isinstance(formatter, str): # We trust that this string refers to a Formatter @@ -160,9 +223,6 @@ def __init__( file_size: int """Size of the serialized dataset in bytes.""" - dataset_id: DatasetId - """DatasetId associated with this record.""" - def rebase(self, ref: DatasetRef) -> StoredFileInfo: """Return a copy of the record suitable for a specified reference. @@ -177,14 +237,13 @@ def rebase(self, ref: DatasetRef) -> StoredFileInfo: record : `StoredFileInfo` New record instance. """ - # take component and dataset_id from the ref, rest comes from self + # take component from the ref, rest comes from self component = ref.datasetType.component() if component is None: component = self.component - dataset_id = ref.id - return self.update(dataset_id=dataset_id, component=component) + return self.update(component=component) - def to_record(self) -> dict[str, Any]: + def to_record(self, **kwargs: Any) -> dict[str, Any]: """Convert the supplied ref to a database record.""" component = self.component if component is None: @@ -192,13 +251,13 @@ def to_record(self) -> dict[str, Any]: # primary key. component = NULLSTR return dict( - dataset_id=self.dataset_id, formatter=self.formatter, path=self.path, storage_class=self.storageClass.name, component=component, checksum=self.checksum, file_size=self.file_size, + **kwargs, ) def file_location(self, factory: LocationFactory) -> Location: @@ -238,7 +297,6 @@ def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredF # Convert name of StorageClass to instance storageClass = cls.storageClassFactory.getStorageClass(record["storage_class"]) component = record["component"] if (record["component"] and record["component"] != NULLSTR) else None - info = cls( formatter=record["formatter"], path=record["path"], @@ -246,7 +304,6 @@ def from_record(cls: type[StoredFileInfo], record: Mapping[str, Any]) -> StoredF component=component, checksum=record["checksum"], file_size=record["file_size"], - dataset_id=record["dataset_id"], ) return info diff --git a/python/lsst/daf/butler/datastores/chainedDatastore.py b/python/lsst/daf/butler/datastores/chainedDatastore.py index f6af89048c..28986d7a23 100644 --- a/python/lsst/daf/butler/datastores/chainedDatastore.py +++ b/python/lsst/daf/butler/datastores/chainedDatastore.py @@ -38,8 +38,14 @@ from collections.abc import Iterable, Mapping, Sequence from typing import TYPE_CHECKING, Any -from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, Datastore, FileDataset -from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreValidationError +from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, FileDataset +from lsst.daf.butler.datastore import ( + DatasetRefURIs, + Datastore, + DatastoreConfig, + DatastoreOpaqueTable, + DatastoreValidationError, +) from lsst.daf.butler.datastore.constraints import Constraints from lsst.daf.butler.datastore.record_data import DatastoreRecordData from lsst.resources import ResourcePath @@ -206,12 +212,7 @@ def __init__( # We declare we are ephemeral if all our child datastores declare # they are ephemeral - isEphemeral = True - for d in self.datastores: - if not d.isEphemeral: - isEphemeral = False - break - self.isEphemeral = isEphemeral + self.isEphemeral = all(d.isEphemeral for d in self.datastores) # per-datastore override constraints if "datastore_constraints" in self.config: @@ -444,6 +445,53 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: if self._transaction is not None: self._transaction.registerUndo("put", self.remove, ref) + def put_new(self, inMemoryDataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]: + # Docstring inherited from base class. + log.debug("Put %s", ref) + + # Confirm that we can accept this dataset + if not self.constraints.isAcceptable(ref): + # Raise rather than use boolean return value. + raise DatasetTypeNotSupportedError( + f"Dataset {ref} has been rejected by this datastore via configuration." + ) + + isPermanent = False + nsuccess = 0 + npermanent = 0 + nephemeral = 0 + stored_refs: dict[str, DatasetRef] = {} + for datastore, constraints in zip(self.datastores, self.datastoreConstraints): + if ( + constraints is not None and not constraints.isAcceptable(ref) + ) or not datastore.constraints.isAcceptable(ref): + log.debug("Datastore %s skipping put via configuration for ref %s", datastore.name, ref) + continue + + if datastore.isEphemeral: + nephemeral += 1 + else: + npermanent += 1 + try: + stored_ref_map = datastore.put_new(inMemoryDataset, ref) + stored_refs.update(stored_ref_map) + nsuccess += 1 + if not datastore.isEphemeral: + isPermanent = True + except DatasetTypeNotSupportedError: + pass + + if nsuccess == 0: + raise DatasetTypeNotSupportedError(f"None of the chained datastores supported ref {ref}") + + if not isPermanent and npermanent > 0: + warnings.warn(f"Put of {ref} only succeeded in ephemeral databases", stacklevel=2) + + if self._transaction is not None: + self._transaction.registerUndo("put", self.remove, ref) + + return stored_refs + def _overrideTransferMode(self, *datasets: Any, transfer: str | None = None) -> str | None: # Docstring inherited from base class. if transfer != "auto": @@ -1130,3 +1178,10 @@ def transfer_from( raise TypeError(f"None of the child datastores could accept transfers from {source_datastore!r}") return all_accepted, remaining_refs + + def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: + # Docstring inherited from the base class. + tables: dict[str, DatastoreOpaqueTable] = {} + for datastore in self.datastores: + tables.update(datastore.get_opaque_table_definitions()) + return tables diff --git a/python/lsst/daf/butler/datastores/fileDatastore.py b/python/lsst/daf/butler/datastores/fileDatastore.py index 5f0c2a7aaf..c07862ea2b 100644 --- a/python/lsst/daf/butler/datastores/fileDatastore.py +++ b/python/lsst/daf/butler/datastores/fileDatastore.py @@ -37,7 +37,7 @@ from collections import defaultdict from collections.abc import Callable, Iterable, Mapping, Sequence from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar, cast from lsst.daf.butler import ( Config, @@ -45,7 +45,6 @@ DatasetRef, DatasetType, DatasetTypeNotSupportedError, - Datastore, FileDataset, FileDescriptor, Formatter, @@ -56,7 +55,13 @@ StorageClass, ddl, ) -from lsst.daf.butler.datastore import DatasetRefURIs, DatastoreConfig, DatastoreValidationError +from lsst.daf.butler.datastore import ( + DatasetRefURIs, + Datastore, + DatastoreConfig, + DatastoreOpaqueTable, + DatastoreValidationError, +) from lsst.daf.butler.datastore.cache_manager import ( AbstractDatastoreCacheManager, DatastoreCacheManager, @@ -64,6 +69,7 @@ ) from lsst.daf.butler.datastore.composites import CompositesMap from lsst.daf.butler.datastore.file_templates import FileTemplates, FileTemplateValidationError +from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore from lsst.daf.butler.datastore.record_data import DatastoreRecordData from lsst.daf.butler.datastore.stored_file_info import StoredDatastoreItemInfo, StoredFileInfo from lsst.daf.butler.registry.interfaces import ( @@ -83,8 +89,6 @@ from lsst.utils.timer import time_this from sqlalchemy import BigInteger, String -from ..datastore.generic_base import GenericBaseDatastore - if TYPE_CHECKING: from lsst.daf.butler import LookupKey from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager @@ -134,7 +138,7 @@ class DatastoreFileGetInformation: """The `StorageClass` of the dataset being read.""" -class FileDatastore(GenericBaseDatastore): +class FileDatastore(GenericBaseDatastore[StoredFileInfo]): """Generic Datastore for file-based implementations. Should always be sub-classed since key abstract methods are missing. @@ -253,8 +257,6 @@ def __init__( if "root" not in self.config: raise ValueError("No root directory specified in configuration") - self._bridgeManager = bridgeManager - # Name ourselves either using an explicit name or a name # derived from the (unexpanded) root if "name" in self.config: @@ -282,11 +284,11 @@ def __init__( # See if composites should be disassembled self.composites = CompositesMap(self.config["composites"], universe=bridgeManager.universe) - tableName = self.config["records", "table"] + self._opaque_table_name = self.config["records", "table"] try: # Storage of paths and formatters, keyed by dataset_id self._table = bridgeManager.opaque.register( - tableName, self.makeTableSpec(bridgeManager.datasetIdColumnType) + self._opaque_table_name, self.makeTableSpec(bridgeManager.datasetIdColumnType) ) # Interface to Registry. self._bridge = bridgeManager.register(self.name) @@ -295,7 +297,7 @@ def __init__( # create a table, it means someone is trying to create a read-only # butler client for an empty repo. That should be okay, as long # as they then try to get any datasets before some other client - # creates the table. Chances are they'rejust validating + # creates the table. Chances are they're just validating # configuration. pass @@ -380,8 +382,24 @@ def addStoredItemInfo( infos: Iterable[StoredFileInfo], insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, ) -> None: - # Docstring inherited from GenericBaseDatastore - records = [info.rebase(ref).to_record() for ref, info in zip(refs, infos, strict=True)] + """Record internal storage information associated with one or more + datasets. + + Parameters + ---------- + refs : sequence of `DatasetRef` + The datasets that have been stored. + infos : sequence of `StoredDatastoreItemInfo` + Metadata associated with the stored datasets. + insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode` + Mode to use to insert the new records into the table. The + options are ``INSERT`` (error if pre-existing), ``REPLACE`` + (replace content with new values), and ``ENSURE`` (skip if the row + already exists). + """ + records = [ + info.rebase(ref).to_record(dataset_id=ref.id) for ref, info in zip(refs, infos, strict=True) + ] match insert_mode: case DatabaseInsertMode.INSERT: self._table.insert(*records, transaction=self._transaction) @@ -392,16 +410,81 @@ def addStoredItemInfo( case _: raise ValueError(f"Unknown insert mode of '{insert_mode}'") - def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]: - # Docstring inherited from GenericBaseDatastore + def getStoredItemsInfo( + self, ref: DatasetIdRef, ignore_datastore_records: bool = False + ) -> list[StoredFileInfo]: + """Retrieve information associated with files stored in this + `Datastore` associated with this dataset ref. + + Parameters + ---------- + ref : `DatasetRef` + The dataset that is to be queried. + ignore_datastore_records : `bool` + If `True` then do not use datastore records stored in refs. + + Returns + ------- + items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`] + Stored information about the files and associated formatters + associated with this dataset. Only one file will be returned + if the dataset has not been disassembled. Can return an empty + list if no matching datasets can be found. + """ + # Try to get them from the ref first. + if ref._datastore_records is not None and not ignore_datastore_records: + if (ref_records := ref._datastore_records.get(self._table.name)) is not None: + # Need to make sure they have correct type. + for record in ref_records: + if not isinstance(record, StoredFileInfo): + raise TypeError(f"Datastore record has unexpected type {record.__class__.__name__}") + return cast(list[StoredFileInfo], ref_records) # Look for the dataset_id -- there might be multiple matches # if we have disassembled the dataset. records = self._table.fetch(dataset_id=ref.id) return [StoredFileInfo.from_record(record) for record in records] + def _register_datasets( + self, + refsAndInfos: Iterable[tuple[DatasetRef, StoredFileInfo]], + insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, + ) -> None: + """Update registry to indicate that one or more datasets have been + stored. + + Parameters + ---------- + refsAndInfos : sequence `tuple` [`DatasetRef`, + `StoredDatastoreItemInfo`] + Datasets to register and the internal datastore metadata associated + with them. + insert_mode : `str`, optional + Indicate whether the new records should be new ("insert", default), + or allowed to exists ("ensure") or be replaced if already present + ("replace"). + """ + expandedRefs: list[DatasetRef] = [] + expandedItemInfos: list[StoredFileInfo] = [] + + for ref, itemInfo in refsAndInfos: + expandedRefs.append(ref) + expandedItemInfos.append(itemInfo) + + # Dataset location only cares about registry ID so if we have + # disassembled in datastore we have to deduplicate. Since they + # will have different datasetTypes we can't use a set + registryRefs = {r.id: r for r in expandedRefs} + if insert_mode == DatabaseInsertMode.INSERT: + self.bridge.insert(registryRefs.values()) + else: + # There are only two columns and all that matters is the + # dataset ID. + self.bridge.ensure(registryRefs.values()) + self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode) + def _get_stored_records_associated_with_refs( - self, refs: Iterable[DatasetIdRef] + self, refs: Iterable[DatasetIdRef], ignore_datastore_records: bool = False ) -> dict[DatasetId, list[StoredFileInfo]]: """Retrieve all records associated with the provided refs. @@ -409,6 +492,8 @@ def _get_stored_records_associated_with_refs( ---------- refs : iterable of `DatasetIdRef` The refs for which records are to be retrieved. + ignore_datastore_records : `bool` + If `True` then do not use datastore records stored in refs. Returns ------- @@ -416,11 +501,27 @@ def _get_stored_records_associated_with_refs( The matching records indexed by the ref ID. The number of entries in the dict can be smaller than the number of requested refs. """ - records = self._table.fetch(dataset_id=[ref.id for ref in refs]) + # Check datastore records in refs first. + records_by_ref: defaultdict[DatasetId, list[StoredFileInfo]] = defaultdict(list) + refs_with_no_records = [] + for ref in refs: + if ignore_datastore_records or ref._datastore_records is None: + refs_with_no_records.append(ref) + else: + if (ref_records := ref._datastore_records.get(self._table.name)) is not None: + # Need to make sure they have correct type. + for ref_record in ref_records: + if not isinstance(ref_record, StoredFileInfo): + raise TypeError( + f"Datastore record has unexpected type {ref_record.__class__.__name__}" + ) + records_by_ref[ref.id].append(ref_record) + + # If there were any refs without datastore records, check opaque table. + records = self._table.fetch(dataset_id=[ref.id for ref in refs_with_no_records]) # Uniqueness is dataset_id + component so can have multiple records # per ref. - records_by_ref = defaultdict(list) for record in records: records_by_ref[record["dataset_id"]].append(StoredFileInfo.from_record(record)) return records_by_ref @@ -462,10 +563,21 @@ def _registered_refs_per_artifact(self, pathInStore: ResourcePath) -> set[Datase return ids def removeStoredItemInfo(self, ref: DatasetIdRef) -> None: - # Docstring inherited from GenericBaseDatastore + """Remove information about the file associated with this dataset. + + Parameters + ---------- + ref : `DatasetRef` + The dataset that has been removed. + """ + # Note that this method is actually not used by this implementation, + # we depend on bridge to delete opaque records. But there are some + # tests that check that this method works, so we keep it for now. self._table.delete(["dataset_id"], {"dataset_id": ref.id}) - def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location, StoredFileInfo]]: + def _get_dataset_locations_info( + self, ref: DatasetIdRef, ignore_datastore_records: bool = False + ) -> list[tuple[Location, StoredFileInfo]]: r"""Find all the `Location`\ s of the requested dataset in the `Datastore` and the associated stored file information. @@ -473,6 +585,8 @@ def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location, ---------- ref : `DatasetRef` Reference to the required `Dataset`. + ignore_datastore_records : `bool` + If `True` then do not use datastore records stored in refs. Returns ------- @@ -481,7 +595,7 @@ def _get_dataset_locations_info(self, ref: DatasetIdRef) -> list[tuple[Location, stored information about each file and its formatter. """ # Get the file information (this will fail if no file) - records = self.getStoredItemsInfo(ref) + records = self.getStoredItemsInfo(ref, ignore_datastore_records) # Use the path to determine the location -- we need to take # into account absolute URIs in the datastore record @@ -583,7 +697,6 @@ def _get_expected_dataset_locations_info(self, ref: DatasetRef) -> list[tuple[Lo component=component, checksum=None, file_size=-1, - dataset_id=ref.id, ), ) for location, formatter, storageClass, component in all_info @@ -1013,7 +1126,6 @@ def _extractIngestInfo( component=ref.datasetType.component(), file_size=size, checksum=checksum, - dataset_id=ref.id, ) def _prepIngest(self, *datasets: FileDataset, transfer: str | None = None) -> _IngestPrepData: @@ -1416,7 +1528,9 @@ def knows(self, ref: DatasetRef) -> bool: exists : `bool` `True` if the dataset is known to the datastore. """ - fileLocations = self._get_dataset_locations_info(ref) + # We cannot trust datastore records from ref, as many unit tests delete + # datasets and check their existence. + fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True) if fileLocations: return True return False @@ -1425,7 +1539,7 @@ def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]: # Docstring inherited from the base class. # The records themselves. Could be missing some entries. - records = self._get_stored_records_associated_with_refs(refs) + records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) return {ref: ref.id in records for ref in refs} @@ -1650,7 +1764,9 @@ def _mexists( requested_ids = set(id_to_ref.keys()) # The records themselves. Could be missing some entries. - records = self._get_stored_records_associated_with_refs(id_to_ref.values()) + records = self._get_stored_records_associated_with_refs( + id_to_ref.values(), ignore_datastore_records=True + ) dataset_existence = self._process_mexists_records( id_to_ref, records, True, artifact_existence=artifact_existence @@ -1746,7 +1862,9 @@ def exists(self, ref: DatasetRef) -> bool: though it is present in the local cache. """ ref = self._cast_storage_class(ref) - fileLocations = self._get_dataset_locations_info(ref) + # We cannot trust datastore records from ref, as many unit tests delete + # datasets and check their existence. + fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True) # if we are being asked to trust that registry might not be correct # we ask for the expected locations and check them explicitly @@ -2281,7 +2399,7 @@ def get( # be looking at the composite ref itself. cache_ref = ref.makeCompositeRef() if isComponent else ref - # For a disassembled component we can validate parametersagainst + # For a disassembled component we can validate parameters against # the component storage class directly if isDisassembled: refStorageClass.validateParameters(parameters) @@ -2348,6 +2466,38 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT) + @transactional + def put_new(self, inMemoryDataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]: + doDisassembly = self.composites.shouldBeDisassembled(ref) + # doDisassembly = True + + artifacts = [] + if doDisassembly: + components = ref.datasetType.storageClass.delegate().disassemble(inMemoryDataset) + if components is None: + raise RuntimeError( + f"Inconsistent configuration: dataset type {ref.datasetType.name} " + f"with storage class {ref.datasetType.storageClass.name} " + "is configured to be disassembled, but cannot be." + ) + for component, componentInfo in components.items(): + # Don't recurse because we want to take advantage of + # bulk insert -- need a new DatasetRef that refers to the + # same dataset_id but has the component DatasetType + # DatasetType does not refer to the types of components + # So we construct one ourselves. + compRef = ref.makeComponentRef(component) + storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef) + artifacts.append((compRef, storedInfo)) + else: + # Write the entire thing out + storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref) + artifacts.append((ref, storedInfo)) + + ref_records = {self._opaque_table_name: [info for _, info in artifacts]} + ref = ref.replace(datastore_records=ref_records) + return {self.name: ref} + @transactional def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None: # At this point can safely remove these datasets from the cache @@ -2368,7 +2518,7 @@ def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = Tr # Determine which datasets are known to datastore directly. id_to_ref = {ref.id: ref for ref in refs} - existing_ids = self._get_stored_records_associated_with_refs(refs) + existing_ids = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) existing_refs = {id_to_ref[ref_id] for ref_id in existing_ids} missing = refs - existing_refs @@ -2613,7 +2763,9 @@ def transfer_from( # What we really want is all the records in the source datastore # associated with these refs. Or derived ones if they don't exist # in the source. - source_records = source_datastore._get_stored_records_associated_with_refs(refs) + source_records = source_datastore._get_stored_records_associated_with_refs( + refs, ignore_datastore_records=True + ) # The source dataset_ids are the keys in these records source_ids = set(source_records) @@ -2688,7 +2840,7 @@ def transfer_from( source_records[missing].extend(dataset_records) # See if we already have these records - target_records = self._get_stored_records_associated_with_refs(refs) + target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) # The artifacts to register artifacts = [] @@ -2993,12 +3145,12 @@ def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None: # TODO: Verify that there are no unexpected table names in the dict? unpacked_records = [] - for dataset_data in record_data.records.values(): + for dataset_id, dataset_data in record_data.records.items(): records = dataset_data.get(self._table.name) if records: for info in records: assert isinstance(info, StoredFileInfo), "Expecting StoredFileInfo records" - unpacked_records.append(info.to_record()) + unpacked_records.append(info.to_record(dataset_id=dataset_id)) if unpacked_records: self._table.insert(*unpacked_records, transaction=self._transaction) @@ -3009,7 +3161,7 @@ def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, Datastore records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {id: {} for id in ids} for row in self._table.fetch(dataset_id=ids): info: StoredDatastoreItemInfo = StoredFileInfo.from_record(row) - dataset_records = records.setdefault(info.dataset_id, {}) + dataset_records = records.setdefault(row["dataset_id"], {}) dataset_records.setdefault(self._table.name, []).append(info) record_data = DatastoreRecordData(records=records) @@ -3029,3 +3181,7 @@ def _cast_storage_class(self, ref: DatasetRef) -> DatasetRef: if dataset_type is not None: ref = ref.overrideStorageClass(dataset_type.storageClass) return ref + + def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: + # Docstring inherited from the base class. + return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(ddl.GUID), StoredFileInfo)} diff --git a/python/lsst/daf/butler/datastores/inMemoryDatastore.py b/python/lsst/daf/butler/datastores/inMemoryDatastore.py index 565c1dc4db..6aa9474e0d 100644 --- a/python/lsst/daf/butler/datastores/inMemoryDatastore.py +++ b/python/lsst/daf/butler/datastores/inMemoryDatastore.py @@ -40,30 +40,26 @@ from lsst.daf.butler import DatasetId, DatasetRef, StorageClass from lsst.daf.butler.datastore import DatasetRefURIs +from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore from lsst.daf.butler.datastore.record_data import DatastoreRecordData from lsst.daf.butler.datastore.stored_file_info import StoredDatastoreItemInfo -from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridge from lsst.daf.butler.utils import transactional from lsst.resources import ResourcePath -from ..datastore.generic_base import GenericBaseDatastore -from ..registry.interfaces import DatabaseInsertMode - if TYPE_CHECKING: from lsst.daf.butler import Config, DatasetType, LookupKey + from lsst.daf.butler.datastore import DatastoreOpaqueTable from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager log = logging.getLogger(__name__) -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class StoredMemoryItemInfo(StoredDatastoreItemInfo): """Internal InMemoryDatastore Metadata associated with a stored DatasetRef. """ - __slots__ = {"timestamp", "storageClass", "parentID", "dataset_id"} - timestamp: float """Unix timestamp indicating the time the dataset was stored.""" @@ -76,11 +72,8 @@ class StoredMemoryItemInfo(StoredDatastoreItemInfo): virtual component of a composite """ - dataset_id: DatasetId - """DatasetId associated with this record.""" - -class InMemoryDatastore(GenericBaseDatastore): +class InMemoryDatastore(GenericBaseDatastore[StoredMemoryItemInfo]): """Basic Datastore for writing to an in memory cache. This datastore is ephemeral in that the contents of the datastore @@ -139,7 +132,7 @@ def __init__( # Related records that share the same parent self.related: dict[DatasetId, set[DatasetId]] = {} - self._bridge = bridgeManager.register(self.name, ephemeral=True) + self._trashedIds: set[DatasetId] = set() @classmethod def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None: @@ -176,42 +169,37 @@ def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool """ return - @property - def bridge(self) -> DatastoreRegistryBridge: - # Docstring inherited from GenericBaseDatastore. - return self._bridge - - def addStoredItemInfo( - self, - refs: Iterable[DatasetRef], - infos: Iterable[StoredMemoryItemInfo], - insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, - ) -> None: - # Docstring inherited from GenericBaseDatastore. - for ref, info in zip(refs, infos, strict=True): - self.records[ref.id] = info - self.related.setdefault(info.parentID, set()).add(ref.id) - - def getStoredItemInfo(self, ref: DatasetIdRef) -> StoredMemoryItemInfo: + def _get_stored_item_info(self, dataset_id: DatasetId) -> StoredMemoryItemInfo: # Docstring inherited from GenericBaseDatastore. - return self.records[ref.id] + return self.records[dataset_id] - def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredMemoryItemInfo]: - # Docstring inherited from GenericBaseDatastore. - return [self.getStoredItemInfo(ref)] - - def removeStoredItemInfo(self, ref: DatasetIdRef) -> None: + def _remove_stored_item_info(self, dataset_id: DatasetId) -> None: # Docstring inherited from GenericBaseDatastore. # If a component has been removed previously then we can sometimes # be asked to remove it again. Other datastores ignore this # so also ignore here - if ref.id not in self.records: + if dataset_id not in self.records: return - record = self.records[ref.id] - del self.records[ref.id] - self.related[record.parentID].remove(ref.id) + record = self.records[dataset_id] + del self.records[dataset_id] + self.related[record.parentID].remove(dataset_id) + + def removeStoredItemInfo(self, ref: DatasetIdRef) -> None: + """Remove information about the file associated with this dataset. + + Parameters + ---------- + ref : `DatasetRef` + The dataset that has been removed. + + Notes + ----- + This method is actually not used by this implementation, but there are + some tests that check that this method works, so we keep it for now. + """ + self._remove_stored_item_info(ref.id) - def _get_dataset_info(self, ref: DatasetIdRef) -> tuple[DatasetId, StoredMemoryItemInfo]: + def _get_dataset_info(self, dataset_id: DatasetId) -> tuple[DatasetId, StoredMemoryItemInfo]: """Check that the dataset is present and return the real ID and associated information. @@ -234,15 +222,15 @@ def _get_dataset_info(self, ref: DatasetIdRef) -> tuple[DatasetId, StoredMemoryI Raised if the dataset is not present in this datastore. """ try: - storedItemInfo = self.getStoredItemInfo(ref) + storedItemInfo = self._get_stored_item_info(dataset_id) except KeyError: - raise FileNotFoundError(f"No such file dataset in memory: {ref}") from None - realID = ref.id + raise FileNotFoundError(f"No such file dataset in memory: {dataset_id}") from None + realID = dataset_id if storedItemInfo.parentID is not None: realID = storedItemInfo.parentID if realID not in self.datasets: - raise FileNotFoundError(f"No such file dataset in memory: {ref}") + raise FileNotFoundError(f"No such file dataset in memory: {dataset_id}") return realID, storedItemInfo @@ -278,7 +266,7 @@ def exists(self, ref: DatasetRef) -> bool: `True` if the entity exists in the `Datastore`. """ try: - self._get_dataset_info(ref) + self._get_dataset_info(ref.id) except FileNotFoundError: return False return True @@ -321,7 +309,7 @@ def get( """ log.debug("Retrieve %s from %s with parameters %s", ref, self.name, parameters) - realID, storedItemInfo = self._get_dataset_info(ref) + realID, storedItemInfo = self._get_dataset_info(ref.id) # We have a write storage class and a read storage class and they # can be different for concrete composites or if overridden. @@ -405,18 +393,23 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None: # Store time we received this content, to allow us to optionally # expire it. Instead of storing a filename here, we include the # ID of this datasetRef so we can find it from components. - itemInfo = StoredMemoryItemInfo( - time.time(), ref.datasetType.storageClass, parentID=ref.id, dataset_id=ref.id - ) + itemInfo = StoredMemoryItemInfo(time.time(), ref.datasetType.storageClass, parentID=ref.id) # We have to register this content with registry. # Currently this assumes we have a file so we need to use stub entries - # TODO: Add to ephemeral part of registry - self._register_datasets([(ref, itemInfo)]) + self.records[ref.id] = itemInfo + self.related.setdefault(itemInfo.parentID, set()).add(ref.id) if self._transaction is not None: self._transaction.registerUndo("put", self.remove, ref) + def put_new(self, inMemoryDataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]: + # It is OK to call put() here because registry is not populating + # bridges as we return empty dict from this method. + self.put(inMemoryDataset, ref) + # As ephemeral we return empty dict. + return {} + def getURIs(self, ref: DatasetRef, predict: bool = False) -> DatasetRefURIs: """Return URIs associated with dataset. @@ -451,7 +444,7 @@ def getURIs(self, ref: DatasetRef, predict: bool = False) -> DatasetRefURIs: name = f"{ref.datasetType.name}" fragment = "#predicted" else: - realID, _ = self._get_dataset_info(ref) + realID, _ = self._get_dataset_info(ref.id) name = f"{id(self.datasets[realID])}?{query}" fragment = "" @@ -517,9 +510,8 @@ def retrieveArtifacts( def forget(self, refs: Iterable[DatasetRef]) -> None: # Docstring inherited. refs = list(refs) - self._bridge.forget(refs) for ref in refs: - self.removeStoredItemInfo(ref) + self._remove_stored_item_info(ref.id) @transactional def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = False) -> None: @@ -544,26 +536,30 @@ def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = Fa since all internal changes are isolated to solely this process and the registry only changes rows associated with this process. """ - if not isinstance(ref, DatasetRef): + if isinstance(ref, DatasetRef): + # Check that this dataset is known to datastore + try: + self._get_dataset_info(ref.id) + except Exception as e: + if ignore_errors: + log.warning( + "Error encountered moving dataset %s to trash in datastore %s: %s", ref, self.name, e + ) + else: + raise + log.debug("Trash %s in datastore %s", ref, self.name) + ref_list = [ref] + else: + ref_list = list(ref) log.debug("Bulk trashing of datasets in datastore %s", self.name) - self.bridge.moveToTrash(ref, transaction=self._transaction) - return - log.debug("Trash %s in datastore %s", ref, self.name) + def _rollbackMoveToTrash(refs: Iterable[DatasetIdRef]) -> None: + for ref in refs: + self._trashedIds.remove(ref.id) - # Check that this dataset is known to datastore - try: - self._get_dataset_info(ref) - - # Move datasets to trash table - self.bridge.moveToTrash([ref], transaction=self._transaction) - except Exception as e: - if ignore_errors: - log.warning( - "Error encountered moving dataset %s to trash in datastore %s: %s", ref, self.name, e - ) - else: - raise + assert self._transaction is not None, "Must be in transaction" + with self._transaction.undoWith(f"Trash {len(ref_list)} datasets", _rollbackMoveToTrash, ref_list): + self._trashedIds.update(ref.id for ref in ref_list) def emptyTrash(self, ignore_errors: bool = False) -> None: """Remove all datasets from the trash. @@ -584,36 +580,38 @@ def emptyTrash(self, ignore_errors: bool = False) -> None: the registry only changes rows associated with this process. """ log.debug("Emptying trash in datastore %s", self.name) - with self._bridge.emptyTrash() as trash_data: - trashed, _ = trash_data - for ref, _ in trashed: - try: - realID, _ = self._get_dataset_info(ref) - except FileNotFoundError: - # Dataset already removed so ignore it + + for dataset_id in self._trashedIds: + try: + realID, _ = self._get_dataset_info(dataset_id) + except FileNotFoundError: + # Dataset already removed so ignore it + continue + except Exception as e: + if ignore_errors: + log.warning( + "Emptying trash in datastore %s but encountered an error with dataset %s: %s", + self.name, + dataset_id, + e, + ) continue - except Exception as e: - if ignore_errors: - log.warning( - "Emptying trash in datastore %s but encountered an error with dataset %s: %s", - self.name, - ref.id, - e, - ) - continue - else: - raise - - # Determine whether all references to this dataset have been - # removed and we can delete the dataset itself - allRefs = self.related[realID] - remainingRefs = allRefs - {ref.id} - if not remainingRefs: - log.debug("Removing artifact %s from datastore %s", realID, self.name) - del self.datasets[realID] - - # Remove this entry - self.removeStoredItemInfo(ref) + else: + raise + + # Determine whether all references to this dataset have been + # removed and we can delete the dataset itself + allRefs = self.related[realID] + remainingRefs = allRefs - {dataset_id} + if not remainingRefs: + log.debug("Removing artifact %s from datastore %s", realID, self.name) + del self.datasets[realID] + + # Remove this entry + self._remove_stored_item_info(dataset_id) + + # Empty the trash table + self._trashedIds = set() def validateConfiguration( self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False @@ -670,3 +668,7 @@ def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, Datastore # In-memory Datastore records cannot be exported or imported return {} + + def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: + # Docstring inherited from the base class. + return {} diff --git a/python/lsst/daf/butler/registries/remote.py b/python/lsst/daf/butler/registries/remote.py index 4e1be429b1..94653119ef 100644 --- a/python/lsst/daf/butler/registries/remote.py +++ b/python/lsst/daf/butler/registries/remote.py @@ -70,6 +70,7 @@ if TYPE_CHECKING: from .._butler_config import ButlerConfig + from ..datastore._datastore import DatastoreOpaqueTable from ..registry._registry import CollectionArgType from ..registry.interfaces import CollectionRecord, DatastoreRegistryBridgeManager @@ -667,6 +668,19 @@ def queryDatasetAssociations( # Docstring inherited from lsst.daf.butler.registry.Registry raise NotImplementedError() + def get_datastore_records(self, ref: DatasetRef) -> DatasetRef: + # Docstring inherited from base class. + # For now this does not do anything and just returns the ref. + return ref + + def store_datastore_records(self, refs: Mapping[str, DatasetRef]) -> None: + # Docstring inherited from base class. + return + + def make_datastore_tables(self, tables: Mapping[str, DatastoreOpaqueTable]) -> None: + # Docstring inherited from base class. + return + storageClasses: StorageClassFactory """All storage classes known to the registry (`StorageClassFactory`). """ diff --git a/python/lsst/daf/butler/registries/sql.py b/python/lsst/daf/butler/registries/sql.py index f2c769aef9..f1eedccaa9 100644 --- a/python/lsst/daf/butler/registries/sql.py +++ b/python/lsst/daf/butler/registries/sql.py @@ -81,13 +81,15 @@ _ButlerRegistry, queries, ) -from ..registry.interfaces import ChainedCollectionRecord, RunRecord +from ..registry.interfaces import ChainedCollectionRecord, ReadOnlyDatabaseError, RunRecord from ..registry.managers import RegistryManagerInstances, RegistryManagerTypes from ..registry.wildcards import CollectionWildcard, DatasetTypeWildcard from ..utils import transactional if TYPE_CHECKING: from .._butler_config import ButlerConfig + from ..datastore._datastore import DatastoreOpaqueTable + from ..datastore.stored_file_info import StoredDatastoreItemInfo from ..registry._registry import CollectionArgType from ..registry.interfaces import ( CollectionRecord, @@ -219,6 +221,11 @@ def __init__(self, database: Database, defaults: RegistryDefaults, managers: Reg # initialized, and must be done before the property getter is used. self.defaults = defaults + # TODO: This is currently initialized by `make_datastore_tables`, + # eventually we'll need to do it during construction. + # The mapping is indexed by the opaque table name. + self._datastore_record_classes: Mapping[str, type[StoredDatastoreItemInfo]] = {} + def __str__(self) -> str: return str(self._db) @@ -450,6 +457,7 @@ def findDataset( *, collections: CollectionArgType | None = None, timespan: Timespan | None = None, + datastore_records: bool = False, **kwargs: Any, ) -> DatasetRef | None: # Docstring inherited from lsst.daf.butler.registry.Registry @@ -544,6 +552,9 @@ def findDataset( ref = reader.read(best_row, data_id=dataId) if component is not None: ref = ref.makeComponentRef(component) + if datastore_records: + ref = self.get_datastore_records(ref) + return ref @transactional @@ -1339,6 +1350,48 @@ def queryDatasetAssociations( timespan = None yield DatasetAssociation(ref=ref, collection=collection_record.name, timespan=timespan) + def get_datastore_records(self, ref: DatasetRef) -> DatasetRef: + # Docstring inherited from base class. + + datastore_records: dict[str, list[StoredDatastoreItemInfo]] = {} + for opaque, record_class in self._datastore_record_classes.items(): + records = self.fetchOpaqueData(opaque, dataset_id=ref.id) + datastore_records[opaque] = [record_class.from_record(record) for record in records] + return ref.replace(datastore_records=datastore_records) + + def store_datastore_records(self, refs: Mapping[str, DatasetRef]) -> None: + # Docstring inherited from base class. + + for datastore_name, ref in refs.items(): + # Store ref IDs in the bridge table. + bridge = self._managers.datastores.register(datastore_name) + bridge.insert([ref]) + + # store records in opaque tables + assert ref._datastore_records is not None, "Dataset ref must have datastore records" + for table_name, records in ref._datastore_records.items(): + opaque_table = self._managers.opaque.get(table_name) + assert opaque_table is not None, f"Unexpected opaque table name {table_name}" + opaque_table.insert(*(record.to_record(dataset_id=ref.id) for record in records)) + + def make_datastore_tables(self, tables: Mapping[str, DatastoreOpaqueTable]) -> None: + # Docstring inherited from base class. + + datastore_record_classes = {} + for table_name, table_def in tables.items(): + datastore_record_classes[table_name] = table_def.record_class + try: + self._managers.opaque.register(table_name, table_def.table_spec) + except ReadOnlyDatabaseError: + # If the database is read only and we just tried and failed to + # create a table, it means someone is trying to create a + # read-only butler client for an empty repo. That should be + # okay, as long as they then try to get any datasets before + # some other client creates the table. Chances are they're + # just validating configuration. + pass + self._datastore_record_classes = datastore_record_classes + @property def obsCoreTableManager(self) -> ObsCoreTableManager | None: # Docstring inherited from lsst.daf.butler.registry.Registry diff --git a/python/lsst/daf/butler/registry/_butler_registry.py b/python/lsst/daf/butler/registry/_butler_registry.py index a0f709f4a7..96b3128373 100644 --- a/python/lsst/daf/butler/registry/_butler_registry.py +++ b/python/lsst/daf/butler/registry/_butler_registry.py @@ -30,6 +30,7 @@ __all__ = ("_ButlerRegistry",) from abc import abstractmethod +from collections.abc import Mapping from typing import TYPE_CHECKING from lsst.resources import ResourcePathExpression @@ -42,6 +43,8 @@ if TYPE_CHECKING: from .._butler_config import ButlerConfig + from .._dataset_ref import DatasetRef + from ..datastore import DatastoreOpaqueTable from .interfaces import CollectionRecord, DatastoreRegistryBridgeManager @@ -216,3 +219,53 @@ def getDatastoreBridgeManager(self) -> DatastoreRegistryBridgeManager: associated datastores. """ raise NotImplementedError() + + @abstractmethod + def get_datastore_records(self, ref: DatasetRef) -> DatasetRef: + """Retrieve datastore records for given ref. + + Parameters + ---------- + ref : `DatasetRef` + Dataset reference for which to retrieve its corresponding datastore + records. + + Returns + ------- + updated_ref : `DatasetRef` + Dataset reference with filled datastore records. + + Notes + ----- + If this method is called with the dataset ref that is not known to the + registry then the reference with an empty set of records is returned. + """ + raise NotImplementedError() + + @abstractmethod + def store_datastore_records(self, refs: Mapping[str, DatasetRef]) -> None: + """Store datastore records for given refs. + + Parameters + ---------- + refs : `~collections.abc.Mapping` [`str`, `DatasetRef`] + Mapping of a datastore name to dataset reference stored in that + datastore, reference must include datastore records. + """ + raise NotImplementedError() + + @abstractmethod + def make_datastore_tables(self, tables: Mapping[str, DatastoreOpaqueTable]) -> None: + """Create opaque tables used by datastores. + + Parameters + ---------- + tables : `~collections.abc.Mapping` + Maps opaque table name to its definition. + + Notes + ----- + This method should disappear in the future when opaque table + definitions will be provided during `Registry` construction. + """ + raise NotImplementedError() diff --git a/python/lsst/daf/butler/registry/_registry.py b/python/lsst/daf/butler/registry/_registry.py index 30b490aebe..a3da58f15e 100644 --- a/python/lsst/daf/butler/registry/_registry.py +++ b/python/lsst/daf/butler/registry/_registry.py @@ -521,6 +521,7 @@ def findDataset( *, collections: CollectionArgType | None = None, timespan: Timespan | None = None, + datastore_records: bool = False, **kwargs: Any, ) -> DatasetRef | None: """Find a dataset given its `DatasetType` and data ID. diff --git a/python/lsst/daf/butler/registry/interfaces/_bridge.py b/python/lsst/daf/butler/registry/interfaces/_bridge.py index 3385be21a3..ccbc942d6e 100644 --- a/python/lsst/daf/butler/registry/interfaces/_bridge.py +++ b/python/lsst/daf/butler/registry/interfaces/_bridge.py @@ -39,6 +39,7 @@ from ._versioning import VersionedExtension, VersionTuple if TYPE_CHECKING: + from ..._dataset_ref import DatasetDatastoreRecords from ..._dataset_type import DatasetType from ...datastore import DatastoreTransaction from ...datastore.stored_file_info import StoredDatastoreItemInfo @@ -91,6 +92,10 @@ def __hash__(self) -> int: def datasetType(self) -> DatasetType: raise AttributeError("A FakeDatasetRef can not be associated with a valid DatasetType") + @property + def _datastore_records(self) -> DatasetDatastoreRecords | None: + raise AttributeError("A FakeDatasetRef can not be associated with datastore records") + DatasetIdRef = DatasetRef | FakeDatasetRef """A type-annotation alias that matches both `DatasetRef` and `FakeDatasetRef`. diff --git a/tests/test_datasets.py b/tests/test_datasets.py index 50ef705535..1d638b1e08 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -39,6 +39,7 @@ StorageClass, StorageClassFactory, ) +from lsst.daf.butler.datastore.stored_file_info import StoredFileInfo """Tests for datasets module. """ @@ -522,12 +523,34 @@ def setUp(self) -> None: self.parentStorageClass = StorageClass( "Parent", components={"a": self.componentStorageClass1, "b": self.componentStorageClass2} ) + sc_factory = StorageClassFactory() + sc_factory.registerStorageClass(self.componentStorageClass1) + sc_factory.registerStorageClass(self.componentStorageClass2) + sc_factory.registerStorageClass(self.parentStorageClass) dimensions = self.universe.extract(("instrument", "visit")) self.dataId = DataCoordinate.standardize( dict(instrument="DummyCam", visit=42), universe=self.universe ) self.datasetType = DatasetType(datasetTypeName, dimensions, self.parentStorageClass) + def _make_datastore_records(self, ref: DatasetRef, *paths: str) -> DatasetRef: + """Return an updated dataset ref with datastore records.""" + opaque_table_name = "datastore_records" + datastore_records = { + opaque_table_name: [ + StoredFileInfo( + formatter="", + path=path, + storageClass=ref.datasetType.storageClass, + component=None, + checksum=None, + file_size=1, + ) + for path in paths + ] + } + return ref.replace(datastore_records=datastore_records) + def testConstructor(self) -> None: """Test that construction preserves and validates values.""" # Constructing a ref requires a run. @@ -670,6 +693,12 @@ def testReplace(self) -> None: self.assertEqual(ref5.run, "somerun") self.assertEqual(ref5, ref) + self.assertIsNone(ref5._datastore_records) + ref5 = ref5.replace(datastore_records={}) + self.assertEqual(ref5._datastore_records, {}) + ref5 = ref5.replace(datastore_records=None) + self.assertIsNone(ref5._datastore_records) + def testPickle(self) -> None: ref = DatasetRef(self.datasetType, self.dataId, run="somerun") s = pickle.dumps(ref) @@ -680,6 +709,14 @@ def testJson(self) -> None: s = ref.to_json() self.assertEqual(DatasetRef.from_json(s, universe=self.universe), ref) + # Also test ref with datastore records + ref = self._make_datastore_records(ref, "/path1", "/path2") + s = ref.to_json() + ref2 = DatasetRef.from_json(s, universe=self.universe) + self.assertEqual(ref2, ref) + self.assertIsNotNone(ref2._datastore_records) + self.assertEqual(ref2._datastore_records, ref._datastore_records) + def testFileDataset(self) -> None: ref = DatasetRef(self.datasetType, self.dataId, run="somerun") file_dataset = FileDataset(path="something.yaml", refs=ref) diff --git a/tests/test_datastore.py b/tests/test_datastore.py index c89ae17265..d18996cfdf 100644 --- a/tests/test_datastore.py +++ b/tests/test_datastore.py @@ -1928,18 +1928,15 @@ def test_StoredFileInfo(self) -> None: formatter="lsst.daf.butler.Formatter", path="a/b/c.txt", component="component", - dataset_id=ref.id, checksum=None, file_size=5, ) info = StoredFileInfo.from_record(record) - self.assertEqual(info.dataset_id, ref.id) self.assertEqual(info.to_record(), record) ref2 = self.makeDatasetRef("metric", DimensionUniverse().extract(()), storageClass, {}) rebased = info.rebase(ref2) - self.assertEqual(rebased.dataset_id, ref2.id) self.assertEqual(rebased.rebase(ref), info) with self.assertRaises(TypeError): diff --git a/tests/test_quantumBackedButler.py b/tests/test_quantumBackedButler.py index d03166ddaa..1cf801fdf9 100644 --- a/tests/test_quantumBackedButler.py +++ b/tests/test_quantumBackedButler.py @@ -370,13 +370,11 @@ def test_extract_provenance_data(self) -> None: datastore_records = provenance.datastore_records[datastore_name] self.assertEqual(set(datastore_records.dataset_ids), output_ids) class_name = "lsst.daf.butler.datastore.stored_file_info.StoredFileInfo" - table_name = "file_datastore_records" self.assertEqual(set(datastore_records.records.keys()), {class_name}) - self.assertEqual(set(datastore_records.records[class_name].keys()), {table_name}) - self.assertEqual( - {record["dataset_id"] for record in datastore_records.records[class_name][table_name]}, - output_ids, - ) + self.assertEqual(set(datastore_records.records[class_name].keys()), {id.hex for id in output_ids}) + table_name = "file_datastore_records" + for dataset_data in datastore_records.records[class_name].values(): + self.assertEqual(set(dataset_data), {table_name}) def test_collect_and_transfer(self) -> None: """Test for collect_and_transfer method"""