Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47607: Add a one-off exposure record processor #108

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/lsst/rubintv/production/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"comcam_noise_map",
"comcam_focal_plane_mosaic",
"comcam_metadata",
"comcam_mount",
"comcam_sim_noise_map",
"comcam_sim_focal_plane_mosaic",
"comcam_sim_calexp_mosaic",
Expand Down
111 changes: 105 additions & 6 deletions python/lsst/rubintv/production/oneOffProcessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,30 @@

from __future__ import annotations

from typing import TYPE_CHECKING
import tempfile
from typing import TYPE_CHECKING, Any

import matplotlib.pyplot as plt
import numpy as np

import lsst.daf.butler as dafButler
from lsst.afw.geom import ellipses
from lsst.pipe.tasks.peekExposure import PeekExposureTask, PeekExposureTaskConfig
from lsst.summit.utils.efdUtils import getEfdData, makeEfdClient
from lsst.summit.utils.simonyi.mountAnalysis import (
MOUNT_IMAGE_BAD_LEVEL,
MOUNT_IMAGE_WARNING_LEVEL,
N_REPLACED_BAD_LEVEL,
N_REPLACED_WARNING_LEVEL,
calculateMountErrors,
plotMountErrors,
)
from lsst.summit.utils.utils import calcEclipticCoords

from .baseChannels import BaseButlerChannel
from .redisUtils import RedisHelper
from .utils import isCalibration, raiseIf, writeMetadataShard
from .uploaders import MultiUploader
from .utils import getFilterColorName, getRubinTvInstrumentName, isCalibration, raiseIf, writeMetadataShard

if TYPE_CHECKING:
from lsst.afw.image import Exposure
Expand Down Expand Up @@ -111,6 +122,11 @@ def __init__(
self.detector = detectorNumber
self.shardsDirectory = shardsDirectory
self.processingStage = processingStage
if self.processingStage == "expRecord":
# remove this conditional once we have the squid proxy
self.uploader = MultiUploader()

self.mountFigure = plt.figure(figsize=(10, 8))

peekConfig = PeekExposureTaskConfig()
self.peekTask = PeekExposureTask(config=peekConfig)
Expand Down Expand Up @@ -194,8 +210,6 @@ def runPostISRCCD(self, dataId: DataCoordinate) -> None:
self.log.info(f"Waiting for postISRCCD for {dataId}")
(expRecord,) = self.butler.registry.queryDimensionRecords("exposure", dataId=dataId)

self.calcTimeSincePrevious(expRecord) # do this while we wait for the postISR to land

# redis signal is sent on the dispatch of the raw, so 40s is plenty but
# not too much
postISR = self._waitForDataProduct(dataId, gettingButler=self.butler, timeout=40)
Expand Down Expand Up @@ -303,17 +317,102 @@ def runCalexp(self, dataId: DataCoordinate) -> None:

return

@staticmethod
def _setFlag(
value: float, key: str, warningLevel: float, badLevel: float, outputDict: dict[str, Any]
) -> dict[str, Any]:
if value >= warningLevel:
flag = f"_{key}"
outputDict[flag] = "warning"
elif value >= badLevel:
flag = f"_{key}"
outputDict[flag] = "bad"
return outputDict

def runMountAnalysis(self, expRecord: DimensionRecord) -> None:
errors, data = calculateMountErrors(expRecord, self.efdClient)
if errors is None or data is None:
self.log.warning(f"Failed to calculate mount errors for {expRecord.id}")
return

assert errors is not None
assert data is not None

outputDict = {}

value = errors.imageImpactRms
key = "Mount motion image degradation"
outputDict[key] = f"{value:.3f}"
outputDict = self._setFlag(value, key, MOUNT_IMAGE_WARNING_LEVEL, MOUNT_IMAGE_BAD_LEVEL, outputDict)

value = errors.azRms
key = "Mount azimuth RMS"
outputDict[key] = f"{value:.3f}"

value = errors.elRms
key = "Mount elevation RMS"
outputDict[key] = f"{value:.3f}"

value = errors.rotRms
key = "Mount rotator RMS"
outputDict[key] = f"{value:.3f}"

value = errors.nReplacedAz
key = "Mount azimuth points replaced"
outputDict[key] = f"{value}"
outputDict = self._setFlag(value, key, N_REPLACED_WARNING_LEVEL, N_REPLACED_BAD_LEVEL, outputDict)

value = errors.nReplacedEl
key = "Mount elevation points replaced"
outputDict[key] = f"{value}"
outputDict = self._setFlag(value, key, N_REPLACED_WARNING_LEVEL, N_REPLACED_BAD_LEVEL, outputDict)

dayObs = expRecord.day_obs
seqNum = expRecord.seq_num
rowData = {seqNum: outputDict}
self.log.info(f"Writing mount analysis shard for {dayObs}-{seqNum}")
writeMetadataShard(self.shardsDirectory, dayObs, rowData)

# TODO: DM-45437 Use a context manager here and everywhere
self.log.info(f"Creating mount plot for {dayObs}-{seqNum}")
tempFilename = tempfile.mktemp(suffix=".png")
plotMountErrors(data, errors, self.mountFigure, saveFilename=tempFilename)
self.uploader.uploadPerSeqNumPlot(
instrument=getRubinTvInstrumentName(expRecord.instrument),
plotName="mount",
dayObs=expRecord.day_obs,
seqNum=expRecord.seq_num,
filename=tempFilename,
)
self.mountFigure.clear()
self.mountFigure.gca().clear()

def setFilterCellColor(self, expRecord: DimensionRecord) -> None:
filterName = expRecord.physical_filter
filterColor = getFilterColorName(filterName)
if filterColor:
md = {expRecord.seq_num: {"_Filter": filterColor}}
writeMetadataShard(self.shardsDirectory, expRecord.day_obs, md)

def runExpRecord(self, expRecord: DimensionRecord) -> None:
self.calcTimeSincePrevious(expRecord)
self.setFilterCellColor(expRecord)
self.runMountAnalysis(expRecord)

def callback(self, payload: Payload) -> None:
dataId: DataCoordinate = payload.dataIds[0]
if len(payload.dataIds) > 1:
raise ValueError(f"Expected only one dataId, got {len(payload.dataIds)}")

dataId = dafButler.DataCoordinate.standardize(dataId, detector=self.detector)

match self.processingStage:
case "expRecord":
(expRecord,) = self.butler.registry.queryDimensionRecords("exposure", dataId=dataId)
self.runExpRecord(expRecord)
case "postISRCCD":
dataId = dafButler.DataCoordinate.standardize(dataId, detector=self.detector)
self.runPostISRCCD(dataId)
case "calexp":
dataId = dafButler.DataCoordinate.standardize(dataId, detector=self.detector)
self.runCalexp(dataId)
case _:
raise ValueError(f"Unknown processing stage {self.processingStage}")
2 changes: 2 additions & 0 deletions python/lsst/rubintv/production/podDefinition.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class PodFlavor(Enum):
STEP2A_WORKER = auto()
STEP2A_AOS_WORKER = auto()
MOSAIC_WORKER = auto()
ONE_OFF_EXPRECORD_WORKER = auto()
ONE_OFF_POSTISR_WORKER = auto()
ONE_OFF_CALEXP_WORKER = auto()

Expand Down Expand Up @@ -71,6 +72,7 @@ def podFlavorToPodType(podFlavor: PodFlavor) -> PodType:
PodFlavor.STEP2A_WORKER: PodType.PER_INSTRUMENT,
PodFlavor.STEP2A_AOS_WORKER: PodType.PER_INSTRUMENT,
PodFlavor.MOSAIC_WORKER: PodType.PER_INSTRUMENT,
PodFlavor.ONE_OFF_EXPRECORD_WORKER: PodType.PER_INSTRUMENT, # one per focal plane, det is meaningless
PodFlavor.ONE_OFF_POSTISR_WORKER: PodType.PER_INSTRUMENT, # hard codes a detector number
PodFlavor.ONE_OFF_CALEXP_WORKER: PodType.PER_INSTRUMENT, # hard codes a detector number
}
Expand Down
1 change: 1 addition & 0 deletions python/lsst/rubintv/production/processingControl.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ def run(self) -> None:
expRecord = self.getNewExposureAndDefineVisit()
if expRecord is not None:
assert self.instrument == expRecord.instrument
self.dispatchOneOffProcessing(expRecord, podFlavor=PodFlavor.ONE_OFF_EXPRECORD_WORKER)
writeExpRecordMetadataShard(expRecord, getShardPath(self.locationConfig, expRecord))
if not isWepImage(expRecord):
self.doStep1FanoutSfm(expRecord)
Expand Down
26 changes: 26 additions & 0 deletions python/lsst/rubintv/production/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1349,3 +1349,29 @@ def removeDetector(dataCoord: DataCoordinate, butler: Butler) -> DataCoordinate:
"""
noDetector = {k: v for k, v in dataCoord.required.items() if k != "detector"}
return DataCoordinate.standardize(noDetector, universe=butler.dimensions)


def getFilterColorName(physicalFilter: str) -> str | None:
"""Get the color name for a physical filter to color cells on RubinTV.

If the color doesn't have a mapping, ``None`` is returned.

Parameters
----------
physicalFilter : `str`
The physical filter name.

Returns
-------
colorName : `str`
The color name.
"""
filterMap = {
"u_02": "u_color",
"g_01": "g_color",
"r_03": "r_color",
"i_06": "i_color",
"z_03": "z_color",
"y_04": "y_color",
}
return filterMap.get(physicalFilter)
65 changes: 65 additions & 0 deletions scripts/LSSTComCam/runOneOffExpRecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# This file is part of rubintv_production.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from lsst.daf.butler import Butler
from lsst.rubintv.production.oneOffProcessing import OneOffProcessor
from lsst.rubintv.production.podDefinition import PodDetails, PodFlavor
from lsst.rubintv.production.utils import getAutomaticLocationConfig, getDoRaise
from lsst.summit.utils.utils import setupLogging

setupLogging()
instrument = "LSSTComCam"

workerNum = 0

locationConfig = getAutomaticLocationConfig()

podDetails = PodDetails(
instrument=instrument, podFlavor=PodFlavor.ONE_OFF_EXPRECORD_WORKER, detectorNumber=None, depth=workerNum
)
print(
f"Running {podDetails.instrument} {podDetails.podFlavor.name} at {locationConfig.location},"
f"consuming from {podDetails.queueName}..."
)

butler = Butler.from_config(
locationConfig.comCamButlerPath,
collections=[
f"{instrument}/defaults",
f"{instrument}/quickLook", # accesses the outputs
],
writeable=True,
)

metadataDirectory = locationConfig.comCamMetadataPath
shardsDirectory = locationConfig.comCamMetadataShardPath

oneOffProcessor = OneOffProcessor(
butler=butler,
locationConfig=locationConfig,
instrument=instrument,
podDetails=podDetails,
detectorNumber=0, # unused, but needs to be set for now. Maybe change later, but it looked annoying to do
shardsDirectory=shardsDirectory,
processingStage="expRecord",
doRaise=getDoRaise(),
)
oneOffProcessor.run()