Skip to content

Commit

Permalink
Rename OpaqueTableDefinition to DatastoreOpaqueTable
Browse files Browse the repository at this point in the history
Old name was too generic, in reality this is very datastore-specific.
I do not think we need generic structure of the same kind, at least not
right now.
  • Loading branch information
andy-slac committed Oct 11, 2023
1 parent 0119c39 commit 0492dd8
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 217 deletions.
12 changes: 6 additions & 6 deletions python/lsst/daf/butler/datastore/_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
from __future__ import annotations

__all__ = (
"DatastoreConfig",
"DatasetRefURIs",
"Datastore",
"DatastoreConfig",
"DatastoreOpaqueTable",
"DatastoreValidationError",
"DatasetRefURIs",
"NullDatastore",
"DatastoreTransaction",
"OpaqueTableDefinition",
)

import contextlib
Expand Down Expand Up @@ -97,7 +97,7 @@ class Event:


@dataclasses.dataclass(frozen=True)
class OpaqueTableDefinition:
class DatastoreOpaqueTable:
"""Definition of the opaque table which stores datastore records.
Table definition contains `.ddl.TableSpec` for a table and a class
Expand Down Expand Up @@ -1264,7 +1264,7 @@ def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType |
pass

@abstractmethod
def opaque_table_definitions(self) -> Mapping[str, OpaqueTableDefinition]:
def opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
"""Make definitions of the opaque tables used by this Datastore.
Returns
Expand Down Expand Up @@ -1398,5 +1398,5 @@ def export_records(
) -> Mapping[str, DatastoreRecordData]:
raise NotImplementedError("This is a no-op datastore that can not access a real datastore")

def opaque_table_definitions(self) -> Mapping[str, OpaqueTableDefinition]:
def opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
return {}
108 changes: 2 additions & 106 deletions python/lsst/daf/butler/datastore/generic_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,16 @@
__all__ = ("GenericBaseDatastore",)

import logging
from abc import abstractmethod
from collections.abc import Iterable, Mapping
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from .._exceptions import DatasetTypeNotSupportedError
from ..datastore._datastore import Datastore
from ..registry.interfaces import DatabaseInsertMode, DatastoreRegistryBridge
from .stored_file_info import StoredDatastoreItemInfo

if TYPE_CHECKING:
from .._dataset_ref import DatasetRef
from .._storage_class import StorageClass
from .stored_file_info import StoredDatastoreItemInfo

log = logging.getLogger(__name__)

Expand All @@ -56,107 +54,6 @@ class GenericBaseDatastore(Datastore, Generic[_InfoType]):
Should always be sub-classed since key abstract methods are missing.
"""

@property
@abstractmethod
def bridge(self) -> DatastoreRegistryBridge:
"""Object that manages the interface between this `Datastore` and the
`Registry` (`DatastoreRegistryBridge`).
"""
raise NotImplementedError()

@abstractmethod
def addStoredItemInfo(
self,
refs: Iterable[DatasetRef],
infos: Iterable[Any],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Record internal storage information associated with one or more
datasets.
Parameters
----------
refs : sequence of `DatasetRef`
The datasets that have been stored.
infos : sequence of `StoredDatastoreItemInfo`
Metadata associated with the stored datasets.
insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode`
Mode to use to insert the new records into the table. The
options are ``INSERT`` (error if pre-existing), ``REPLACE``
(replace content with new values), and ``ENSURE`` (skip if the row
already exists).
"""
raise NotImplementedError()

@abstractmethod
def getStoredItemsInfo(self, ref: DatasetRef) -> Iterable[_InfoType]:
"""Retrieve information associated with files stored in this
`Datastore` associated with this dataset ref.
Parameters
----------
ref : `DatasetRef`
The dataset that is to be queried.
Returns
-------
items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`]
Stored information about the files and associated formatters
associated with this dataset. Only one file will be returned
if the dataset has not been disassembled. Can return an empty
list if no matching datasets can be found.
"""
raise NotImplementedError()

@abstractmethod
def removeStoredItemInfo(self, ref: DatasetRef) -> None:
"""Remove information about the file associated with this dataset.
Parameters
----------
ref : `DatasetRef`
The dataset that has been removed.
"""
raise NotImplementedError()

def _register_datasets(
self,
refsAndInfos: Iterable[tuple[DatasetRef, StoredDatastoreItemInfo]],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Update registry to indicate that one or more datasets have been
stored.
Parameters
----------
refsAndInfos : sequence `tuple` [`DatasetRef`,
`StoredDatastoreItemInfo`]
Datasets to register and the internal datastore metadata associated
with them.
insert_mode : `str`, optional
Indicate whether the new records should be new ("insert", default),
or allowed to exists ("ensure") or be replaced if already present
("replace").
"""
expandedRefs: list[DatasetRef] = []
expandedItemInfos = []

for ref, itemInfo in refsAndInfos:
expandedRefs.append(ref)
expandedItemInfos.append(itemInfo)

# Dataset location only cares about registry ID so if we have
# disassembled in datastore we have to deduplicate. Since they
# will have different datasetTypes we can't use a set
registryRefs = {r.id: r for r in expandedRefs}
if insert_mode == DatabaseInsertMode.INSERT:
self.bridge.insert(registryRefs.values())
else:
# There are only two columns and all that matters is the
# dataset ID.
self.bridge.ensure(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)

def _post_process_get(
self,
inMemoryDataset: object,
Expand Down Expand Up @@ -272,7 +169,6 @@ def transfer(self, inputDatastore: Datastore, ref: DatasetRef) -> None:
The external `Datastore` from which to retreive the Dataset.
ref : `DatasetRef`
Reference to the required dataset in the input data store.
"""
assert inputDatastore is not self # unless we want it for renames?
inMemoryDataset = inputDatastore.get(ref)
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
DatasetRefURIs,
Datastore,
DatastoreConfig,
DatastoreOpaqueTable,
DatastoreValidationError,
OpaqueTableDefinition,
)
from lsst.daf.butler.datastore.constraints import Constraints
from lsst.daf.butler.datastore.record_data import DatastoreRecordData
Expand Down Expand Up @@ -1179,9 +1179,9 @@ def transfer_from(

return all_accepted, remaining_refs

def opaque_table_definitions(self) -> Mapping[str, OpaqueTableDefinition]:
def opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
# Docstring inherited from the base class.
tables: dict[str, OpaqueTableDefinition] = {}
tables: dict[str, DatastoreOpaqueTable] = {}
for datastore in self.datastores:
tables.update(datastore.opaque_table_definitions())
return tables
85 changes: 78 additions & 7 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
DatasetRef,
DatasetType,
DatasetTypeNotSupportedError,
Datastore,
FileDataset,
FileDescriptor,
Formatter,
Expand All @@ -58,9 +57,10 @@
)
from lsst.daf.butler.datastore import (
DatasetRefURIs,
Datastore,
DatastoreConfig,
DatastoreOpaqueTable,
DatastoreValidationError,
OpaqueTableDefinition,
)
from lsst.daf.butler.datastore.cache_manager import (
AbstractDatastoreCacheManager,
Expand Down Expand Up @@ -382,7 +382,21 @@ def addStoredItemInfo(
infos: Iterable[StoredFileInfo],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
# Docstring inherited from GenericBaseDatastore
"""Record internal storage information associated with one or more
datasets.
Parameters
----------
refs : sequence of `DatasetRef`
The datasets that have been stored.
infos : sequence of `StoredDatastoreItemInfo`
Metadata associated with the stored datasets.
insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode`
Mode to use to insert the new records into the table. The
options are ``INSERT`` (error if pre-existing), ``REPLACE``
(replace content with new values), and ``ENSURE`` (skip if the row
already exists).
"""
records = [
info.rebase(ref).to_record(dataset_id=ref.id) for ref, info in zip(refs, infos, strict=True)
]
Expand All @@ -397,8 +411,22 @@ def addStoredItemInfo(
raise ValueError(f"Unknown insert mode of '{insert_mode}'")

def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
# Docstring inherited from GenericBaseDatastore
"""Retrieve information associated with files stored in this
`Datastore` associated with this dataset ref.
Parameters
----------
ref : `DatasetRef`
The dataset that is to be queried.
Returns
-------
items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`]
Stored information about the files and associated formatters
associated with this dataset. Only one file will be returned
if the dataset has not been disassembled. Can return an empty
list if no matching datasets can be found.
"""
# Try to get them from the ref first.
if ref.datastore_records is not None:
if (ref_records := ref.datastore_records.get(self._table.name)) is not None:
Expand All @@ -413,6 +441,44 @@ def getStoredItemsInfo(self, ref: DatasetIdRef) -> list[StoredFileInfo]:
records = self._table.fetch(dataset_id=ref.id)
return [StoredFileInfo.from_record(record) for record in records]

def _register_datasets(
self,
refsAndInfos: Iterable[tuple[DatasetRef, StoredFileInfo]],
insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
) -> None:
"""Update registry to indicate that one or more datasets have been
stored.
Parameters
----------
refsAndInfos : sequence `tuple` [`DatasetRef`,
`StoredDatastoreItemInfo`]
Datasets to register and the internal datastore metadata associated
with them.
insert_mode : `str`, optional
Indicate whether the new records should be new ("insert", default),
or allowed to exists ("ensure") or be replaced if already present
("replace").
"""
expandedRefs: list[DatasetRef] = []
expandedItemInfos: list[StoredFileInfo] = []

for ref, itemInfo in refsAndInfos:
expandedRefs.append(ref)
expandedItemInfos.append(itemInfo)

# Dataset location only cares about registry ID so if we have
# disassembled in datastore we have to deduplicate. Since they
# will have different datasetTypes we can't use a set
registryRefs = {r.id: r for r in expandedRefs}
if insert_mode == DatabaseInsertMode.INSERT:
self.bridge.insert(registryRefs.values())
else:
# There are only two columns and all that matters is the
# dataset ID.
self.bridge.ensure(registryRefs.values())
self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)

def _get_stored_records_associated_with_refs(
self, refs: Iterable[DatasetIdRef]
) -> dict[DatasetId, list[StoredFileInfo]]:
Expand Down Expand Up @@ -491,8 +557,13 @@ def _registered_refs_per_artifact(self, pathInStore: ResourcePath) -> set[Datase
return ids

def removeStoredItemInfo(self, ref: DatasetIdRef) -> None:
# Docstring inherited from GenericBaseDatastore
"""Remove information about the file associated with this dataset.
Parameters
----------
ref : `DatasetRef`
The dataset that has been removed.
"""
# Note that this method is actually not used by this implementation,
# we depend on bridge to delete opaque records. But there are some
# tests that check that this method works, so we keep it for now.
Expand Down Expand Up @@ -3093,6 +3164,6 @@ def _cast_storage_class(self, ref: DatasetRef) -> DatasetRef:
ref = ref.overrideStorageClass(dataset_type.storageClass)
return ref

def opaque_table_definitions(self) -> Mapping[str, OpaqueTableDefinition]:
def opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
# Docstring inherited from the base class.
return {self._opaque_table_name: OpaqueTableDefinition(self.makeTableSpec(ddl.GUID), StoredFileInfo)}
return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(ddl.GUID), StoredFileInfo)}
Loading

0 comments on commit 0492dd8

Please sign in to comment.