Skip to content

Commit

Permalink
Merge pull request #13 from AlecThomson/history
Browse files Browse the repository at this point in the history
Log to HISTORY table
  • Loading branch information
AlecThomson authored Feb 7, 2024
2 parents 51f07b1 + 8540b20 commit 5498b3d
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 22 deletions.
30 changes: 23 additions & 7 deletions fixms/fix_ms_corrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,23 +296,35 @@ def fix_ms_corrs(
corrected_data_column (str, optional): The name of the corrected data column. Defaults to "CORRECTED_DATA".
fix_stokes_factor (bool, optional): Whether to fix the Stokes factor. Defaults to True.
"""
logger.info(f"Correcting {data_column} of {str(ms)}.")

_function_args = [f"{key}={val}" for key, val in locals().items()]

logger.info(
f"Correcting {data_column} of {str(ms)}.", ms=ms, app_params=_function_args
)
# Do checks
with table(ms.as_posix(), readonly=True, ack=False) as tab:
cols = tab.colnames()
# Check if 'data_column' exists
if data_column not in cols:
logger.critical(f"Column {data_column} does not exist in {ms}! Exiting...")
logger.critical(
f"Column {data_column} does not exist in {ms}! Exiting...",
ms=ms,
app_params=_function_args,
)
return
# Check if 'corrected_data_column' exists
if corrected_data_column in cols:
logger.critical(
f"Column {corrected_data_column} already exists in {ms}! Exiting..."
f"Column {corrected_data_column} already exists in {ms}! Exiting...",
ms=ms,
app_params=_function_args,
)
if check_data(ms, data_column, corrected_data_column):
logger.critical(
f"We checked the data in {data_column} against {corrected_data_column} and it looks like the correction has already been applied!"
f"We checked the data in {data_column} against {corrected_data_column} and it looks like the correction has already been applied!",
ms=ms,
app_params=_function_args,
)
return

Expand All @@ -336,7 +348,7 @@ def fix_ms_corrs(

# Get the polarization axis
pol_axis = get_pol_axis(ms, feed_idx=feed_idx)
logger.info(f"Polarization axis is {pol_axis}")
logger.info(f"Polarization axis is {pol_axis}", ms=ms, app_params=_function_args)

# Get the data chunk by chunk and convert the correlations
# then write them back to the MS in the 'data_column' column
Expand All @@ -357,7 +369,9 @@ def fix_ms_corrs(
f"Column {corrected_data_column} already exists in {ms}! You should never see this message! "
f"Possible an existing {data_column} has already been corrected. "
f"No correction will be applied. "
)
),
ms=ms,
app_params=_function_args,
)
else:
# Only perform this correction if the data column was
Expand All @@ -380,7 +394,9 @@ def fix_ms_corrs(
start_row += len(data_chunk_cor)

logger.info(
f"Finished correcting {data_column} of {str(ms)}. Written to {corrected_data_column} column."
f"Finished correcting {data_column} of {str(ms)}. Written to {corrected_data_column} column.",
ms=ms,
app_params=_function_args,
)


Expand Down
30 changes: 17 additions & 13 deletions fixms/fix_ms_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def decs_rad(dec_string):


def fix_ms_dir(ms):
logger.info("Fixing FEED directions in %s" % (ms))
logger.info("Fixing FEED directions in %s" % (ms), ms=ms)
# Check that the observation wasn't in pol_fixed mode
with table("%s/ANTENNA" % (ms), readonly=True, ack=False) as ta:
ant_mount = ta.getcol("MOUNT", 0, 1)
Expand All @@ -240,24 +240,24 @@ def fix_ms_dir(ms):
beam = beam_from_ms(ms)

if not tableexists("%s/FIELD_OLD" % (ms)):
logger.info("Making copy of original FIELD table")
logger.info("Making copy of original FIELD table", ms=ms)
tablecopy(tablename="%s/FIELD" % (ms), newtablename="%s/FIELD_OLD" % (ms))
else:
logger.info("Original copy of FIELD table is being used")
logger.info("Original copy of FIELD table is being used", ms=ms)

if not tableexists("%s/FEED_OLD" % (ms)):
logger.info("Making copy of original FEED table")
logger.info("Making copy of original FEED table", ms=ms)
tablecopy(tablename="%s/FEED" % (ms), newtablename="%s/FEED_OLD" % (ms))
else:
logger.info("Original copy of FEED table is being used")
logger.info("Original copy of FEED table is being used", ms=ms)

logger.info("Reading phase directions")
logger.info("Reading phase directions", ms=ms)
with table("%s/FIELD_OLD" % (ms), readonly=True, ack=False) as tp:
ms_phase = tp.getcol("PHASE_DIR")

# Work out how many fields are in the MS.
n_fields = ms_phase.shape[0]
logger.info("Found %d fields in FIELD table" % (n_fields))
logger.info("Found %d fields in FIELD table" % (n_fields), ms=ms)

# Open up the MS FEED table so we can work out what the offset is for the beam.
with table("%s/FEED" % (ms), readonly=False, ack=False) as tf:
Expand All @@ -277,7 +277,9 @@ def fix_ms_dir(ms):
n_offsets = t1.getcol("BEAM_OFFSET").shape[0]
offset_times = t1.getcol("TIME")
offset_intervals = t1.getcol("INTERVAL")
logger.info("Found %d offsets in FEED table for beam %d" % (n_offsets, beam))
logger.info(
"Found %d offsets in FEED table for beam %d" % (n_offsets, beam), ms=ms
)
for offset_index in trange(n_offsets, desc="Fixing offsets", file=TQDM_OUT):
offset = t1.getcol("BEAM_OFFSET")[offset_index]
logger.info(
Expand All @@ -288,7 +290,8 @@ def fix_ms_dir(ms):
offset_times[offset_index] + offset_intervals[offset_index] / 2.0,
-offset[0][0] * 180.0 / np.pi,
offset[0][1] * 180.0 / np.pi,
)
),
ms=ms,
)

# Update the beam position for each field
Expand All @@ -300,7 +303,7 @@ def fix_ms_dir(ms):
)
time_data = tfdata.getcol("TIME")
if len(time_data) == 0:
# logger.info("Warning: Couldn't find valid data for field %d" %(field))
# logger.info("Warning: Couldn't find valid data for field %d" %(field), ms=ms)
continue

offset_index = -1
Expand All @@ -315,7 +318,7 @@ def fix_ms_dir(ms):
offset_index = offset
break

# logger.info("Field %d : t=%f : offset=%d" %(field, time_data[0], offset_index))
# logger.info("Field %d : t=%f : offset=%d" %(field, time_data[0], offset_index), ms=ms)
# Obtain the offset for the current field.
offset = t1.getcol("BEAM_OFFSET")[offset_index]

Expand All @@ -337,7 +340,8 @@ def fix_ms_dir(ms):
time_data[0],
time_data[-1],
offset_index,
)
),
ms=ms,
)
# Update the FIELD table with the beam position
new_ra = new_pos.ra
Expand All @@ -351,7 +355,7 @@ def fix_ms_dir(ms):
tp.putcol("PHASE_DIR", ms_phase)
tp.putcol("REFERENCE_DIR", ms_phase)

logger.info("Finished fixed FEED directions")
logger.info("Finished fixed FEED directions", ms=ms)


def cli():
Expand Down
79 changes: 77 additions & 2 deletions fixms/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,81 @@

import io
import logging
import sys
from importlib.metadata import version
from typing import List

import astropy.units as u
from astropy.time import Time
from casacore.tables import table


def update_history(
ms: str,
message: str,
app_params: List[str] = [],
obs_id: int = 0,
priority: str = "NORMAL",
) -> None:
_allowed_priorities = ("DEBUGGING", "WARN", "NORMAL", "SEVERE")
if priority not in _allowed_priorities:
raise ValueError(
f"Priority must be one of {_allowed_priorities}, got {priority}"
)

this_program = f"fixms-{version('fixms')}"
now = (Time.now().mjd * u.day).to(u.second).value
cli_args = sys.argv
history_row = {
"TIME": now,
"OBSERVATION_ID": obs_id,
"MESSAGE": message,
"PRIORITY": priority,
"CLI_COMMAND": cli_args,
"APP_PARAMS": app_params,
"ORIGIN": this_program,
}
with table(f"{ms}/HISTORY", readonly=False) as history:
history.addrows(1)
for key, value in history_row.items():
history.putcell(key, history.nrows() - 1, value)


class LoggerWithHistory(logging.Logger):
"""Custom logger that will also update the HISTORY table in the MS."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def info(self, message, *args, ms=None, app_params=[], **kwargs):
super().info(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="NORMAL")

def warning(self, message, *args, ms=None, app_params=[], **kwargs):
super().warning(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="WARN")

def error(self, message, *args, ms=None, app_params=[], **kwargs):
super().error(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="SEVERE")

def critical(self, message, *args, ms=None, app_params=[], **kwargs):
super().error(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="SEVERE")

def debug(self, message, *args, ms=None, app_params=[], **kwargs):
super().debug(message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority="DEBUGGING")

def log(self, level, message, *args, ms=None, app_params=[], **kwargs):
super().log(level, message, *args, **kwargs)
if ms is not None:
update_history(ms, message, app_params, priority=level)


class TqdmToLogger(io.StringIO):
Expand Down Expand Up @@ -58,7 +133,7 @@ def format(self, record):

def get_fixms_logger(
name: str = "fixms", attach_handler: bool = True
) -> logging.Logger:
) -> LoggerWithHistory:
"""Will construct a logger object.
Args:
Expand All @@ -69,7 +144,7 @@ def get_fixms_logger(
logging.Logger: The appropriate logger
"""
logging.captureWarnings(True)
logger = logging.getLogger(name)
logger = LoggerWithHistory(name)
logger.setLevel(logging.WARNING)

if attach_handler:
Expand Down

0 comments on commit 5498b3d

Please sign in to comment.