Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log to HISTORY table #13

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions fixms/fix_ms_corrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,23 +296,35 @@ def fix_ms_corrs(
corrected_data_column (str, optional): The name of the corrected data column. Defaults to "CORRECTED_DATA".
fix_stokes_factor (bool, optional): Whether to fix the Stokes factor. Defaults to True.
"""
logger.info(f"Correcting {data_column} of {str(ms)}.")

_function_args = [f"{key}={val}" for key, val in locals().items()]

logger.info(
f"Correcting {data_column} of {str(ms)}.", ms=ms, app_params=_function_args
)
# Do checks
with table(ms.as_posix(), readonly=True, ack=False) as tab:
cols = tab.colnames()
# Check if 'data_column' exists
if data_column not in cols:
logger.critical(f"Column {data_column} does not exist in {ms}! Exiting...")
logger.critical(
f"Column {data_column} does not exist in {ms}! Exiting...",
ms=ms,
app_params=_function_args,
)
return
# Check if 'corrected_data_column' exists
if corrected_data_column in cols:
logger.critical(
f"Column {corrected_data_column} already exists in {ms}! Exiting..."
f"Column {corrected_data_column} already exists in {ms}! Exiting...",
ms=ms,
app_params=_function_args,
)
if check_data(ms, data_column, corrected_data_column):
logger.critical(
f"We checked the data in {data_column} against {corrected_data_column} and it looks like the correction has already been applied!"
f"We checked the data in {data_column} against {corrected_data_column} and it looks like the correction has already been applied!",
ms=ms,
app_params=_function_args,
)
return

Expand All @@ -336,7 +348,7 @@ def fix_ms_corrs(

# Get the polarization axis
pol_axis = get_pol_axis(ms, feed_idx=feed_idx)
logger.info(f"Polarization axis is {pol_axis}")
logger.info(f"Polarization axis is {pol_axis}", ms=ms, app_params=_function_args)

# Get the data chunk by chunk and convert the correlations
# then write them back to the MS in the 'data_column' column
Expand All @@ -357,7 +369,9 @@ def fix_ms_corrs(
f"Column {corrected_data_column} already exists in {ms}! You should never see this message! "
f"Possible an existing {data_column} has already been corrected. "
f"No correction will be applied. "
)
),
ms=ms,
app_params=_function_args,
)
else:
# Only perform this correction if the data column was
Expand All @@ -380,7 +394,9 @@ def fix_ms_corrs(
start_row += len(data_chunk_cor)

logger.info(
f"Finished correcting {data_column} of {str(ms)}. Written to {corrected_data_column} column."
f"Finished correcting {data_column} of {str(ms)}. Written to {corrected_data_column} column.",
ms=ms,
app_params=_function_args,
)


Expand Down
30 changes: 17 additions & 13 deletions fixms/fix_ms_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def decs_rad(dec_string):


def fix_ms_dir(ms):
logger.info("Fixing FEED directions in %s" % (ms))
logger.info("Fixing FEED directions in %s" % (ms), ms=ms)
# Check that the observation wasn't in pol_fixed mode
with table("%s/ANTENNA" % (ms), readonly=True, ack=False) as ta:
ant_mount = ta.getcol("MOUNT", 0, 1)
Expand All @@ -240,24 +240,24 @@ def fix_ms_dir(ms):
beam = beam_from_ms(ms)

if not tableexists("%s/FIELD_OLD" % (ms)):
logger.info("Making copy of original FIELD table")
logger.info("Making copy of original FIELD table", ms=ms)
tablecopy(tablename="%s/FIELD" % (ms), newtablename="%s/FIELD_OLD" % (ms))
else:
logger.info("Original copy of FIELD table is being used")
logger.info("Original copy of FIELD table is being used", ms=ms)

if not tableexists("%s/FEED_OLD" % (ms)):
logger.info("Making copy of original FEED table")
logger.info("Making copy of original FEED table", ms=ms)
tablecopy(tablename="%s/FEED" % (ms), newtablename="%s/FEED_OLD" % (ms))
else:
logger.info("Original copy of FEED table is being used")
logger.info("Original copy of FEED table is being used", ms=ms)

logger.info("Reading phase directions")
logger.info("Reading phase directions", ms=ms)
with table("%s/FIELD_OLD" % (ms), readonly=True, ack=False) as tp:
ms_phase = tp.getcol("PHASE_DIR")

# Work out how many fields are in the MS.
n_fields = ms_phase.shape[0]
logger.info("Found %d fields in FIELD table" % (n_fields))
logger.info("Found %d fields in FIELD table" % (n_fields), ms=ms)

# Open up the MS FEED table so we can work out what the offset is for the beam.
with table("%s/FEED" % (ms), readonly=False, ack=False) as tf:
Expand All @@ -277,7 +277,9 @@ def fix_ms_dir(ms):
n_offsets = t1.getcol("BEAM_OFFSET").shape[0]
offset_times = t1.getcol("TIME")
offset_intervals = t1.getcol("INTERVAL")
logger.info("Found %d offsets in FEED table for beam %d" % (n_offsets, beam))
logger.info(
"Found %d offsets in FEED table for beam %d" % (n_offsets, beam), ms=ms
)
for offset_index in trange(n_offsets, desc="Fixing offsets", file=TQDM_OUT):
offset = t1.getcol("BEAM_OFFSET")[offset_index]
logger.info(
Expand All @@ -288,7 +290,8 @@ def fix_ms_dir(ms):
offset_times[offset_index] + offset_intervals[offset_index] / 2.0,
-offset[0][0] * 180.0 / np.pi,
offset[0][1] * 180.0 / np.pi,
)
),
ms=ms,
)

# Update the beam position for each field
Expand All @@ -300,7 +303,7 @@ def fix_ms_dir(ms):
)
time_data = tfdata.getcol("TIME")
if len(time_data) == 0:
# logger.info("Warning: Couldn't find valid data for field %d" %(field))
# logger.info("Warning: Couldn't find valid data for field %d" %(field), ms=ms)
continue

offset_index = -1
Expand All @@ -315,7 +318,7 @@ def fix_ms_dir(ms):
offset_index = offset
break

# logger.info("Field %d : t=%f : offset=%d" %(field, time_data[0], offset_index))
# logger.info("Field %d : t=%f : offset=%d" %(field, time_data[0], offset_index), ms=ms)
# Obtain the offset for the current field.
offset = t1.getcol("BEAM_OFFSET")[offset_index]

Expand All @@ -337,7 +340,8 @@ def fix_ms_dir(ms):
time_data[0],
time_data[-1],
offset_index,
)
),
ms=ms,
)
# Update the FIELD table with the beam position
new_ra = new_pos.ra
Expand All @@ -351,7 +355,7 @@ def fix_ms_dir(ms):
tp.putcol("PHASE_DIR", ms_phase)
tp.putcol("REFERENCE_DIR", ms_phase)

logger.info("Finished fixed FEED directions")
logger.info("Finished fixed FEED directions", ms=ms)


def cli():
Expand Down
79 changes: 77 additions & 2 deletions fixms/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,81 @@

import io
import logging
import sys
from importlib.metadata import version
from typing import List

import astropy.units as u
from astropy.time import Time
from casacore.tables import table


def update_history(
ms: str,
message: str,
app_params: List[str] = [],
obs_id: int = 0,
priority: str = "NORMAL",
) -> None:
_allowed_priorities = ("DEBUGGING", "WARN", "NORMAL", "SEVERE")
if priority not in _allowed_priorities:
raise ValueError(
f"Priority must be one of {_allowed_priorities}, got {priority}"
)

this_program = f"fixms-{version('fixms')}"
now = (Time.now().mjd * u.day).to(u.second).value
cli_args = sys.argv
history_row = {
"TIME": now,
"OBSERVATION_ID": obs_id,
"MESSAGE": message,
"PRIORITY": priority,
"CLI_COMMAND": cli_args,
"APP_PARAMS": app_params,
"ORIGIN": this_program,
}
with table(f"{ms}/HISTORY", readonly=False) as history:
history.addrows(1)
for key, value in history_row.items():
history.putcell(key, history.nrows() - 1, value)


class LoggerWithHistory(logging.Logger):
"""Custom logger that will also update the HISTORY table in the MS."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def info(self, message, *args, ms=None, app_params=[], **kwargs):
super().info(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="NORMAL")

def warning(self, message, *args, ms=None, app_params=[], **kwargs):
super().warning(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="WARN")

def error(self, message, *args, ms=None, app_params=[], **kwargs):
super().error(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="SEVERE")

def critical(self, message, *args, ms=None, app_params=[], **kwargs):
super().error(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="SEVERE")

def debug(self, message, *args, ms=None, app_params=[], **kwargs):
super().debug(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="DEBUGGING")

def log(self, level, message, *args, ms=None, app_params=[], **kwargs):
super().log(level, message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority=level)


class TqdmToLogger(io.StringIO):
Expand Down Expand Up @@ -58,7 +133,7 @@ def format(self, record):

def get_fixms_logger(
name: str = "fixms", attach_handler: bool = True
) -> logging.Logger:
) -> LoggerWithHistory:
"""Will construct a logger object.

Args:
Expand All @@ -69,7 +144,7 @@ def get_fixms_logger(
logging.Logger: The appropriate logger
"""
logging.captureWarnings(True)
logger = logging.getLogger(name)
logger = LoggerWithHistory(name)
logger.setLevel(logging.WARNING)

if attach_handler:
Expand Down
Loading