From 2db9937cd6ed45992dcbde71463cdd10c8066170 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 10 Oct 2024 10:51:15 -0700 Subject: [PATCH] Add script for generating APDB catalogs from internal data. This script takes a long time to run, because it simulates ApPipe execution visit by visit. --- scripts/generate_self_preload.py | 254 +++++++++++++++++++++++++++++++ scripts/make_preloaded.sh | 6 +- 2 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 scripts/generate_self_preload.py diff --git a/scripts/generate_self_preload.py b/scripts/generate_self_preload.py new file mode 100644 index 0000000..2633f48 --- /dev/null +++ b/scripts/generate_self_preload.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python +# This file is part of ap_verify_ci_hits2015. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Script for simulating the APDB preloads generated by associating this +dataset's observations. + +This script assumes visits are processed in ascending numerical order, with no +prior history. Prior sources must be provided through a different script. + +This script requires that all other input datasets are already in preloaded/, +but does not require that export.yaml be up to date. It (or rather, the +pipeline configs) require that the repository be set up. + +This script takes roughly 10 minutes to run on rubin-devl. +""" + +import glob +import logging +import os +import subprocess +import sys +import tempfile + +import lsst.log +from lsst.daf.butler import Butler, CollectionType, MissingCollectionError +import lsst.obs.base +import lsst.dax.apdb + + +logging.basicConfig(level=logging.INFO, stream=sys.stdout) +lsst.log.configure_pylog_MDC("DEBUG", MDC_class=None) + + +# Avoid explicit references to dataset package to maximize portability. +SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__)) +PIPE_DIR = os.path.join(SCRIPT_DIR, "..", "pipelines") +RAW_DIR = os.path.join(SCRIPT_DIR, "..", "raw") +RAW_RUN = "raw" +PRELOAD_TYPES = ["preloaded_*"] +DEST_DIR = os.path.join(SCRIPT_DIR, "..", "preloaded") +DEST_COLLECTION = "dia_catalogs" +DEST_RUN = DEST_COLLECTION + "/apdb" + + +######################################## +# Processing steps + +def _clear_preloaded(butler): + """Remove preloaded datasets from the collection's chain. + + If it exists, ``DEST_RUN`` is removed entirely to keep it from interfering + with the rest of this script. Other runs are merely unlinked in case they + would still be useful. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + A writeable Butler pointing to this repository. + """ + try: + butler.collections.redefine_chain(DEST_COLLECTION, []) + except MissingCollectionError: + # No preloaded datasets to begin with + return + butler.removeRuns([DEST_RUN], unstore=True) + + +def _copy_repo_to(src_butler, repo_dir): + """Create a repository that's a copy of this one. + + Parameters + ---------- + src_butler : `lsst.daf.butler.Butler` + A Butler pointing to this repository. + repo_dir : `str` + The directory in which to create the new repository. + + Returns + ------- + butler : `lsst.daf.butler.Butler` + A writeable Butler to the new repo. + """ + # Don't use ap_verify code, to avoid dependency on potentially out-of-date export.yaml + repo_config = Butler.makeRepo(repo_dir) + dest_butler = Butler(repo_config, writeable=True) + logging.debug("Temporary repo has universe version %d.", dest_butler.dimensions.version) + # Use export/import to preserve chained and calibration collections + with tempfile.NamedTemporaryFile(suffix=".yaml") as export_file: + with src_butler.export(filename=export_file.name, transfer=None) as contents: + for t in src_butler.registry.queryDatasetTypes(): + contents.saveDatasets( + src_butler.query_datasets(t, collections="*", find_first=False, explain=False)) + # runs and dimensions included automatically + for coll in src_butler.collections.query("*", include_chains=True): + contents.saveCollection(coll) + dest_butler.import_(directory=DEST_DIR, filename=export_file.name, transfer="auto") + return dest_butler + + +def _ingest_raws(repo, raw_dir, run): + """Ingest this dataset's raws into a specific repo. + + Parameters + --------- + repo : `lsst.daf.butler.Butler` + A writeable Butler for the repository to ingest into. + raw_dir : `str` + The directory containing raw files. + run : `str` + The name of the run into which to import the raws. + """ + raws = glob.glob(os.path.join(raw_dir, '**', '*.fits*'), recursive=True) + ingester = lsst.obs.base.RawIngestTask(butler=repo, config=lsst.obs.base.RawIngestConfig()) + ingester.run(raws, run=run) + exposures = set(repo.registry.queryDataIds(["exposure"])) + definer = lsst.obs.base.DefineVisitsTask(butler=repo, config=lsst.obs.base.DefineVisitsConfig()) + definer.run(exposures) + + +def _check_pipeline(butler): + """Confirm that the pipeline is correctly configured. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + A Butler pointing to this repository. + """ + pipeline_file = os.path.join(PIPE_DIR, "ApPipe.yaml") + pipeline = lsst.pipe.base.Pipeline.fromFile(pipeline_file) + pipeline.addConfigOverride("parameters", "apdb_config", "foo") + # Check that the configs load correctly; raises if there's a setup missing + pipeline.to_graph() + + +def _build_catalogs(repo_dir, input_collections, output_collection): + """Simulate an AP pipeline run. + + Parameters + ---------- + repo_dir : `str` + The repository on which to run the task. + input_collections : iterable [`str`] + The collection containing inputs. + output_collection : `str` + The collection into which to generate preloaded catalogs. + + Raises + ------ + RuntimeError + Raised on any pipeline failure. + """ + # Should be only one instrument + butler = Butler(repo_dir) + instrument = butler.query_data_ids("instrument")[0]["instrument"] + visits = [coord["visit"] for coord in butler.query_data_ids("visit")] + pipeline_file = os.path.join(PIPE_DIR, "ApPipe.yaml") + + # Create temporary APDB + apdb_location = f"sqlite:///{repo_dir}/apdb.db" + logging.debug("Creating apdb at %s...", apdb_location) + apdb_config = lsst.dax.apdb.ApdbSql.init_database(db_url=apdb_location) + + with tempfile.NamedTemporaryFile(suffix=".py") as config_file: + apdb_config.save(config_file.name) + + # Guarantee execution in observation order + run_exists = False + for visit in sorted(visits): + logging.info("Generating catalogs for visit %d...", visit) + pipeline_args = ["pipetask", "run", + "--butler-config", repo_dir, + "--pipeline", pipeline_file, + "--config", f"parameters:apdb_config='{config_file.name}'", + "--input", ",".join(input_collections), + # Can reuse collection as long as data IDs don't overlap + "--output-run", output_collection, + "--data-query", f"instrument='{instrument}' and visit={visit}", + "--processes", "6", + "--register-dataset-types", + ] + if run_exists: + pipeline_args.append("--extend-run") + results = subprocess.run(pipeline_args, capture_output=False, shell=False, check=False) + run_exists = True + if results.returncode: + raise RuntimeError("Pipeline failed to run; see log for details.") + + +def _transfer_catalogs(catalog_types, src_repo, run, dest_repo): + """Copy preloaded catalogs between two repositories. + + Parameters + ---------- + catalog_types : iterable [`str`] + A query expression for dataset types for preloaded catalogs. + src_repo : `lsst.daf.butler.Butler` + The repository from which to copy the datasets. + run : `str` + The name of the run containing the catalogs in both ``src_repo`` + and ``dest_repo``. + dest_repo : `lsst.daf.butler.Butler` + The repository to which to copy the datasets. + """ + expanded_types = src_repo.registry.queryDatasetTypes(catalog_types) + datasets = set() + for t in expanded_types: + datasets.update(src_repo.query_datasets(t, collections=run, explain=False)) + dest_repo.transfer_from(src_repo, datasets, transfer="copy", + register_dataset_types=True, transfer_dimensions=True) + + +######################################## +# Put everything together + +preloaded = Butler(DEST_DIR, writeable=True) +_check_pipeline(preloaded) +logging.info("Removing old catalogs...") +_clear_preloaded(preloaded) +logging.info("Creating temporary repository...") +with tempfile.TemporaryDirectory() as workspace: + temp_repo = _copy_repo_to(preloaded, workspace) + logging.info("Ingesting raws...") + _ingest_raws(temp_repo, RAW_DIR, RAW_RUN) + logging.info("Simulating DIA analysis...") + inst_name = temp_repo.query_data_ids("instrument")[0]["instrument"] + instrument = lsst.obs.base.Instrument.fromName(inst_name, temp_repo.registry) + _build_catalogs(workspace, [RAW_RUN, instrument.makeUmbrellaCollectionName()], DEST_RUN) + temp_repo.registry.refresh() # Pipeline added dataset types + logging.debug("Preloaded repo has universe version %d.", preloaded.dimensions.version) + logging.info("Transferring catalogs to data set...") + _transfer_catalogs(PRELOAD_TYPES, temp_repo, DEST_RUN, preloaded) +preloaded.collections.register(DEST_COLLECTION, CollectionType.CHAINED) +preloaded.collections.prepend_chain(DEST_COLLECTION, DEST_RUN) + +logging.info("Preloaded APDB catalogs copied to %s:%s", DEST_DIR, DEST_COLLECTION) diff --git a/scripts/make_preloaded.sh b/scripts/make_preloaded.sh index 0abcbfb..265f5f2 100755 --- a/scripts/make_preloaded.sh +++ b/scripts/make_preloaded.sh @@ -25,11 +25,15 @@ python "${SCRIPT_DIR}/get_refcats.py" # pretrained NN models python "${SCRIPT_DIR}/get_nn_models.py" -m rbResnet50-DC2 +# Precomputed fake sources bash "${SCRIPT_DIR}/generate_fake_injection_catalog.sh" -b preloaded -o fake-injection-catalog +# Preloaded APDB catalogs +python "${SCRIPT_DIR}/generate_self_preload.py" + # collection chains butler collection-chain "${DATASET_REPO}" LSSTCam-imSim/defaults templates/goodSeeing skymaps LSSTCam-imSim/calib \ - refcats models fake-injection-catalog + refcats dia_catalogs models fake-injection-catalog # make the export file for ap_verify to use python "${SCRIPT_DIR}/make_preloaded_export.py"