-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add script for generating APDB catalogs from internal data.
This script takes a long time to run, because it simulates ApPipe execution visit by visit.
- Loading branch information
1 parent
59a65bc
commit 2db9937
Showing
2 changed files
with
259 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,254 @@ | ||
#!/usr/bin/env python | ||
# This file is part of ap_verify_ci_hits2015. | ||
# | ||
# Developed for the LSST Data Management System. | ||
# This product includes software developed by the LSST Project | ||
# (https://www.lsst.org). | ||
# See the COPYRIGHT file at the top-level directory of this distribution | ||
# for details of code ownership. | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
"""Script for simulating the APDB preloads generated by associating this | ||
dataset's observations. | ||
This script assumes visits are processed in ascending numerical order, with no | ||
prior history. Prior sources must be provided through a different script. | ||
This script requires that all other input datasets are already in preloaded/, | ||
but does not require that export.yaml be up to date. It (or rather, the | ||
pipeline configs) require that the repository be set up. | ||
This script takes roughly 10 minutes to run on rubin-devl. | ||
""" | ||
|
||
import glob | ||
import logging | ||
import os | ||
import subprocess | ||
import sys | ||
import tempfile | ||
|
||
import lsst.log | ||
from lsst.daf.butler import Butler, CollectionType, MissingCollectionError | ||
import lsst.obs.base | ||
import lsst.dax.apdb | ||
|
||
|
||
logging.basicConfig(level=logging.INFO, stream=sys.stdout) | ||
lsst.log.configure_pylog_MDC("DEBUG", MDC_class=None) | ||
|
||
|
||
# Avoid explicit references to dataset package to maximize portability. | ||
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__)) | ||
PIPE_DIR = os.path.join(SCRIPT_DIR, "..", "pipelines") | ||
RAW_DIR = os.path.join(SCRIPT_DIR, "..", "raw") | ||
RAW_RUN = "raw" | ||
PRELOAD_TYPES = ["preloaded_*"] | ||
DEST_DIR = os.path.join(SCRIPT_DIR, "..", "preloaded") | ||
DEST_COLLECTION = "dia_catalogs" | ||
DEST_RUN = DEST_COLLECTION + "/apdb" | ||
|
||
|
||
######################################## | ||
# Processing steps | ||
|
||
def _clear_preloaded(butler): | ||
"""Remove preloaded datasets from the collection's chain. | ||
If it exists, ``DEST_RUN`` is removed entirely to keep it from interfering | ||
with the rest of this script. Other runs are merely unlinked in case they | ||
would still be useful. | ||
Parameters | ||
---------- | ||
butler : `lsst.daf.butler.Butler` | ||
A writeable Butler pointing to this repository. | ||
""" | ||
try: | ||
butler.collections.redefine_chain(DEST_COLLECTION, []) | ||
except MissingCollectionError: | ||
# No preloaded datasets to begin with | ||
return | ||
butler.removeRuns([DEST_RUN], unstore=True) | ||
|
||
|
||
def _copy_repo_to(src_butler, repo_dir): | ||
"""Create a repository that's a copy of this one. | ||
Parameters | ||
---------- | ||
src_butler : `lsst.daf.butler.Butler` | ||
A Butler pointing to this repository. | ||
repo_dir : `str` | ||
The directory in which to create the new repository. | ||
Returns | ||
------- | ||
butler : `lsst.daf.butler.Butler` | ||
A writeable Butler to the new repo. | ||
""" | ||
# Don't use ap_verify code, to avoid dependency on potentially out-of-date export.yaml | ||
repo_config = Butler.makeRepo(repo_dir) | ||
dest_butler = Butler(repo_config, writeable=True) | ||
logging.debug("Temporary repo has universe version %d.", dest_butler.dimensions.version) | ||
# Use export/import to preserve chained and calibration collections | ||
with tempfile.NamedTemporaryFile(suffix=".yaml") as export_file: | ||
with src_butler.export(filename=export_file.name, transfer=None) as contents: | ||
for t in src_butler.registry.queryDatasetTypes(): | ||
contents.saveDatasets( | ||
src_butler.query_datasets(t, collections="*", find_first=False, explain=False)) | ||
# runs and dimensions included automatically | ||
for coll in src_butler.collections.query("*", include_chains=True): | ||
contents.saveCollection(coll) | ||
dest_butler.import_(directory=DEST_DIR, filename=export_file.name, transfer="auto") | ||
return dest_butler | ||
|
||
|
||
def _ingest_raws(repo, raw_dir, run): | ||
"""Ingest this dataset's raws into a specific repo. | ||
Parameters | ||
--------- | ||
repo : `lsst.daf.butler.Butler` | ||
A writeable Butler for the repository to ingest into. | ||
raw_dir : `str` | ||
The directory containing raw files. | ||
run : `str` | ||
The name of the run into which to import the raws. | ||
""" | ||
raws = glob.glob(os.path.join(raw_dir, '**', '*.fits*'), recursive=True) | ||
ingester = lsst.obs.base.RawIngestTask(butler=repo, config=lsst.obs.base.RawIngestConfig()) | ||
ingester.run(raws, run=run) | ||
exposures = set(repo.registry.queryDataIds(["exposure"])) | ||
definer = lsst.obs.base.DefineVisitsTask(butler=repo, config=lsst.obs.base.DefineVisitsConfig()) | ||
definer.run(exposures) | ||
|
||
|
||
def _check_pipeline(butler): | ||
"""Confirm that the pipeline is correctly configured. | ||
Parameters | ||
---------- | ||
butler : `lsst.daf.butler.Butler` | ||
A Butler pointing to this repository. | ||
""" | ||
pipeline_file = os.path.join(PIPE_DIR, "ApPipe.yaml") | ||
pipeline = lsst.pipe.base.Pipeline.fromFile(pipeline_file) | ||
pipeline.addConfigOverride("parameters", "apdb_config", "foo") | ||
# Check that the configs load correctly; raises if there's a setup missing | ||
pipeline.to_graph() | ||
|
||
|
||
def _build_catalogs(repo_dir, input_collections, output_collection): | ||
"""Simulate an AP pipeline run. | ||
Parameters | ||
---------- | ||
repo_dir : `str` | ||
The repository on which to run the task. | ||
input_collections : iterable [`str`] | ||
The collection containing inputs. | ||
output_collection : `str` | ||
The collection into which to generate preloaded catalogs. | ||
Raises | ||
------ | ||
RuntimeError | ||
Raised on any pipeline failure. | ||
""" | ||
# Should be only one instrument | ||
butler = Butler(repo_dir) | ||
instrument = butler.query_data_ids("instrument")[0]["instrument"] | ||
visits = [coord["visit"] for coord in butler.query_data_ids("visit")] | ||
pipeline_file = os.path.join(PIPE_DIR, "ApPipe.yaml") | ||
|
||
# Create temporary APDB | ||
apdb_location = f"sqlite:///{repo_dir}/apdb.db" | ||
logging.debug("Creating apdb at %s...", apdb_location) | ||
apdb_config = lsst.dax.apdb.ApdbSql.init_database(db_url=apdb_location) | ||
|
||
with tempfile.NamedTemporaryFile(suffix=".py") as config_file: | ||
apdb_config.save(config_file.name) | ||
|
||
# Guarantee execution in observation order | ||
run_exists = False | ||
for visit in sorted(visits): | ||
logging.info("Generating catalogs for visit %d...", visit) | ||
pipeline_args = ["pipetask", "run", | ||
"--butler-config", repo_dir, | ||
"--pipeline", pipeline_file, | ||
"--config", f"parameters:apdb_config='{config_file.name}'", | ||
"--input", ",".join(input_collections), | ||
# Can reuse collection as long as data IDs don't overlap | ||
"--output-run", output_collection, | ||
"--data-query", f"instrument='{instrument}' and visit={visit}", | ||
"--processes", "6", | ||
"--register-dataset-types", | ||
] | ||
if run_exists: | ||
pipeline_args.append("--extend-run") | ||
results = subprocess.run(pipeline_args, capture_output=False, shell=False, check=False) | ||
run_exists = True | ||
if results.returncode: | ||
raise RuntimeError("Pipeline failed to run; see log for details.") | ||
|
||
|
||
def _transfer_catalogs(catalog_types, src_repo, run, dest_repo): | ||
"""Copy preloaded catalogs between two repositories. | ||
Parameters | ||
---------- | ||
catalog_types : iterable [`str`] | ||
A query expression for dataset types for preloaded catalogs. | ||
src_repo : `lsst.daf.butler.Butler` | ||
The repository from which to copy the datasets. | ||
run : `str` | ||
The name of the run containing the catalogs in both ``src_repo`` | ||
and ``dest_repo``. | ||
dest_repo : `lsst.daf.butler.Butler` | ||
The repository to which to copy the datasets. | ||
""" | ||
expanded_types = src_repo.registry.queryDatasetTypes(catalog_types) | ||
datasets = set() | ||
for t in expanded_types: | ||
datasets.update(src_repo.query_datasets(t, collections=run, explain=False)) | ||
dest_repo.transfer_from(src_repo, datasets, transfer="copy", | ||
register_dataset_types=True, transfer_dimensions=True) | ||
|
||
|
||
######################################## | ||
# Put everything together | ||
|
||
preloaded = Butler(DEST_DIR, writeable=True) | ||
_check_pipeline(preloaded) | ||
logging.info("Removing old catalogs...") | ||
_clear_preloaded(preloaded) | ||
logging.info("Creating temporary repository...") | ||
with tempfile.TemporaryDirectory() as workspace: | ||
temp_repo = _copy_repo_to(preloaded, workspace) | ||
logging.info("Ingesting raws...") | ||
_ingest_raws(temp_repo, RAW_DIR, RAW_RUN) | ||
logging.info("Simulating DIA analysis...") | ||
inst_name = temp_repo.query_data_ids("instrument")[0]["instrument"] | ||
instrument = lsst.obs.base.Instrument.fromName(inst_name, temp_repo.registry) | ||
_build_catalogs(workspace, [RAW_RUN, instrument.makeUmbrellaCollectionName()], DEST_RUN) | ||
temp_repo.registry.refresh() # Pipeline added dataset types | ||
logging.debug("Preloaded repo has universe version %d.", preloaded.dimensions.version) | ||
logging.info("Transferring catalogs to data set...") | ||
_transfer_catalogs(PRELOAD_TYPES, temp_repo, DEST_RUN, preloaded) | ||
preloaded.collections.register(DEST_COLLECTION, CollectionType.CHAINED) | ||
preloaded.collections.prepend_chain(DEST_COLLECTION, DEST_RUN) | ||
|
||
logging.info("Preloaded APDB catalogs copied to %s:%s", DEST_DIR, DEST_COLLECTION) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters