diff --git a/data/SkyPy/lsst-like.yml b/data/SkyPy/lsst-like.yml index 98d4326ae..c94ab2414 100644 --- a/data/SkyPy/lsst-like.yml +++ b/data/SkyPy/lsst-like.yml @@ -34,7 +34,7 @@ tables: magnitudes: $blue.M coefficients: $blue.coeff filter: bessell-B - mag_g, mag_r, mag_i, mag_z, mag_Y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes + mag_g, mag_r, mag_i, mag_z, mag_y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes coefficients: $blue.coeff filters: $filters redshift: $blue.z @@ -79,7 +79,7 @@ tables: magnitudes: $red.M coefficients: $red.coeff filter: bessell-B - mag_g, mag_r, mag_i, mag_z, mag_Y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes + mag_g, mag_r, mag_i, mag_z, mag_y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes coefficients: $red.coeff filters: $filters redshift: $red.z diff --git a/data/SkyPy/lsst-like_triple_SF.yml b/data/SkyPy/lsst-like_triple_SF.yml index 95fc2f034..95229159e 100644 --- a/data/SkyPy/lsst-like_triple_SF.yml +++ b/data/SkyPy/lsst-like_triple_SF.yml @@ -30,7 +30,7 @@ tables: coefficients: $blue.coeff magnitudes: $blue.M filter: bessell-B - mag_g, mag_r, mag_i, mag_z, mag_Y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes + mag_g, mag_r, mag_i, mag_z, mag_y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes coefficients: $blue.coeff redshift: $blue.z filters: $filters @@ -84,7 +84,7 @@ tables: coefficients: $red.coeff magnitudes: $red.M filter: bessell-B - mag_g, mag_r, mag_i, mag_z, mag_Y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes + mag_g, mag_r, mag_i, mag_z, mag_y: !skypy.galaxies.spectrum.kcorrect.apparent_magnitudes coefficients: $red.coeff redshift: $red.z filters: $filters diff --git a/slsim/LsstSciencePipeline/opsim_pipeline.py b/slsim/LsstSciencePipeline/opsim_pipeline.py index 4b23e0186..f1ce441da 100644 --- a/slsim/LsstSciencePipeline/opsim_pipeline.py +++ b/slsim/LsstSciencePipeline/opsim_pipeline.py @@ -131,6 +131,7 @@ def opsim_time_series_images_data( obs_time, expo_time, zero_point_mag, + psf_fwhm, radec_list, bandpass, ], @@ -140,6 +141,7 @@ def opsim_time_series_images_data( "obs_time", "expo_time", "zero_point", + "psf_fwhm", "calexp_center", "band", ), diff --git a/slsim/LsstSciencePipeline/util_lsst.py b/slsim/LsstSciencePipeline/util_lsst.py index b25ab38b6..b062c782c 100644 --- a/slsim/LsstSciencePipeline/util_lsst.py +++ b/slsim/LsstSciencePipeline/util_lsst.py @@ -5,8 +5,12 @@ fits_append_table, convert_mjd_to_days, transient_event_time_mjd, + flux_error_to_magnitude_error, + transformmatrix_to_pixelscale, + magnitude_to_amplitude, ) import os +import warnings def variable_lens_injection( @@ -210,3 +214,373 @@ def opsim_variable_lens_injection( exposure_data_new.add_columns([lens_col, final_image_col]) return exposure_data_new + + +def optimized_transient_event_time_mjd( + mjd_times, lightcurve_range=(-50, 100), min_points=100, optimized_cadence=True +): + """Finds a transient start time such that at least min_points fall within + the light curve range. If no points meet the requirement for the minimum + number of observations, a start point will be randomly selected from the + given observation times. Additionally, if optimized_cadence is set to + False, the start point will also be chosen randomly. + + :param mjd_times: List or array of MJD times. + :param lightcurve_range: Range of lightcurve in days (e.g., (-50, + 100)). + :param min_points: Minimum number of points required in the range. + eg: 100 + :param optimized_cadence: Boolean. If True, search for the time interval where given + minimum number of observation points occur. + :return: Chosen transient start mjd. Returns None if no valid start + point is found. + """ + mjd_times = np.array(mjd_times) + min_mjd, max_mjd = min(mjd_times), max(mjd_times) + + if optimized_cadence is True: + # Iterate through possible zero_point_mjd values + for zero_point_mjd in np.linspace( + min_mjd, max_mjd, 1000 + ): # Test multiple candidates + start, end = ( + zero_point_mjd + lightcurve_range[0], + zero_point_mjd + lightcurve_range[1], + ) + points_in_range = np.sum((mjd_times >= start) & (mjd_times <= end)) + + if points_in_range >= min_points: + start_point = zero_point_mjd # Return the first valid zero_point_mjd + else: + warning_msg = ( + "In the given observations, there is no observation window which contains" + " %s observational points. So, random start point is choosen" + % min_points + ) + warnings.warn(warning_msg, category=UserWarning, stacklevel=2) + start_point = transient_event_time_mjd(min_mjd=min_mjd, max_mjd=max_mjd) + else: + start_point = transient_event_time_mjd(min_mjd=min_mjd, max_mjd=max_mjd) + return start_point + + +def transient_data_with_cadence( + lens_class, + exposure_data, + transform_pix2angle=None, + num_pix=61, + optimized_cadence=True, + min_points=100, + noise=True, + symmetric=False, +): + """Puts lensed transient into the provided cadence. For LSST, this will be + cadence from opsim. + + :param lens_class: Lens() object + :param exposure_data: An astropy table of exposure data. One entry + of table_list_data generated from the + opsim_time_series_images_data function. It must contain the rms + of background noise fluctuations (column name should be + "bkg_noise"), psf kernel for each exposure (column name should + be "psf_kernel", these are pixel psf kernel for each single + exposure images in time series image), observation time (column + name should be "obs_time", these are observation time in days + for each single exposure images in time series images), exposure + time (column name should be "expo_time", these are exposure time + for each single exposure images in time series images), + magnitude zero point (column name should be "zero_point", these + are zero point magnitudes for each single exposure images in + time series image), coordinates of the object (column name + should be "calexp_center"), these are the coordinates in (ra, + dec), and the band in which the observation is taken (column + name should be "band"). + :param transform_pix2angle: Transformation matrix (2x2) of pixels + into coordinate displacements. + :param num_pix: number of pixel for the image. + :param optimized_cadence: Boolean. If True, search for the time + interval where given minimum number of observation points occur. + :param min_points: minimum number of observations in a certain time + window. + :param noise: Boolean. If True, a gaussian noise is added to the + lightcurve flux. + :param symmetric: Boolean. If True, a symmetric error on magnitude + is provided. + :return: Astropy table of lightcurve and exposure information of dp0 + data + """ + + pix_scale = transformmatrix_to_pixelscale(transform_pix2angle) + copied_exposure_data = exposure_data.copy() + ## chose transient starting point randomly. + min_lc_time = min(lens_class.source[0].lightcurve_time) + max_lc_time = max(lens_class.source[0].lightcurve_time) + start_point_mjd_time = optimized_transient_event_time_mjd( + mjd_times=copied_exposure_data["obs_time"], + lightcurve_range=(min_lc_time, max_lc_time), + min_points=min_points, + optimized_cadence=optimized_cadence, + ) + obs_time_in_days = convert_mjd_to_days( + copied_exposure_data["obs_time"], start_point_mjd_time + ) + obs_time_in_days_col = Column(name="obs_time_in_days", data=obs_time_in_days) + copied_exposure_data.add_column(obs_time_in_days_col) + copied_exposure_data = copied_exposure_data[ + (copied_exposure_data["obs_time_in_days"] >= min_lc_time - 50) + & (copied_exposure_data["obs_time_in_days"] <= max_lc_time) + ] + ra_dec_list = copied_exposure_data["calexp_center"][0] + lens_id = [ + lens_class.generate_id(ra=ra_dec_list[0].degree, dec=ra_dec_list[1].degree) + ] * len(copied_exposure_data["obs_time"]) + magnitude_image_1 = [] + magnitude_image_2 = [] + magnitude_image_3 = [] + magnitude_image_4 = [] + mag_error_1_minus = [] + mag_error_1_plus = [] + mag_error_2_minus = [] + mag_error_2_plus = [] + mag_error_3_minus = [] + mag_error_3_plus = [] + mag_error_4_minus = [] + mag_error_4_plus = [] + image_list = [] + # store data for each observations. + for obs in range(len(copied_exposure_data["obs_time"])): + + exposure_data_obs = copied_exposure_data[obs] + observation_time = exposure_data_obs["obs_time_in_days"] + magnitude = lens_class.point_source_magnitude( + band=exposure_data_obs["band"], lensed=True, time=observation_time + ) + # computing necessary quantities in flux unit + amplitude_image_1 = magnitude_to_amplitude( + magnitude[0][0], exposure_data_obs["zero_point"] + ) + amplitude_image_2 = magnitude_to_amplitude( + magnitude[0][1], exposure_data_obs["zero_point"] + ) + total_counts_1 = amplitude_image_1 * exposure_data_obs["expo_time"] + total_counts_2 = amplitude_image_2 * exposure_data_obs["expo_time"] + bkg_noise = exposure_data_obs["bkg_noise"] + + fwhm = exposure_data_obs["psf_fwhm"] + N_pix = np.pi * (2 * fwhm) ** 2 / (pix_scale**2) + sigma_noise_total = bkg_noise / np.sqrt(N_pix) + # sigma_noise_total = np.sqrt(bkg_noise*N_pix) + poisson_noise_1 = np.sqrt(total_counts_1) + poisson_noise_2 = np.sqrt(total_counts_2) + # magnitude and magnitude errors for image 1 and 2. + flux_err_1 = np.sqrt(sigma_noise_total**2 + poisson_noise_1**2) + flux_err_2 = np.sqrt(sigma_noise_total**2 + poisson_noise_2**2) + (mag_gaussian_realiz_1, mag_err_1_minus, mag_err_1_plus) = ( + flux_error_to_magnitude_error( + amplitude_image_1, + flux_err_1, + exposure_data_obs["zero_point"], + noise=noise, + symmetric=symmetric, + ) + ) + (mag_gaussian_realiz_2, mag_err_2_minus, mag_err_2_plus) = ( + flux_error_to_magnitude_error( + amplitude_image_2, + flux_err_2, + exposure_data_obs["zero_point"], + noise=noise, + symmetric=symmetric, + ) + ) + magnitude_image_1.append(mag_gaussian_realiz_1) + mag_error_1_minus.append(mag_err_1_minus) + mag_error_1_plus.append(mag_err_1_plus) + magnitude_image_2.append(mag_gaussian_realiz_2) + mag_error_2_minus.append(mag_err_2_minus) + mag_error_2_plus.append(mag_err_2_plus) + + # produce an image of the lensed transient and store. + lens_image_single = lens_image( + lens_class, + band=exposure_data_obs["band"], + mag_zero_point=exposure_data_obs["zero_point"], + num_pix=num_pix, + psf_kernel=exposure_data_obs["psf_kernel"], + transform_pix2angle=transform_pix2angle, + exposure_time=exposure_data_obs["expo_time"], + t_obs=observation_time, + std_gaussian_noise=exposure_data_obs["bkg_noise"], + ) + image_list.append(lens_image_single) + # magnitude and magnitude errors for image 3. + if lens_class.image_number[0] > 2: + amplitude_image_3 = magnitude_to_amplitude( + magnitude[0][2], exposure_data_obs["zero_point"] + ) + total_counts_3 = amplitude_image_3 * exposure_data_obs["expo_time"] + poisson_noise_3 = np.sqrt(total_counts_3) + flux_err_3 = np.sqrt(sigma_noise_total**2 + poisson_noise_3**2) + (mag_gaussian_realiz_3, mag_err_3_minus, mag_err_3_plus) = ( + flux_error_to_magnitude_error( + amplitude_image_3, + flux_err_3, + exposure_data_obs["zero_point"], + noise=noise, + symmetric=symmetric, + ) + ) + magnitude_image_3.append(mag_gaussian_realiz_3) + mag_error_3_minus.append(mag_err_3_minus) + mag_error_3_plus.append(mag_err_3_plus) + # magnitude and magnitude errors for image 4. + if lens_class.image_number[0] > 3: + amplitude_image_4 = magnitude_to_amplitude( + magnitude[0][3], exposure_data_obs["zero_point"] + ) + total_counts_4 = amplitude_image_3 * exposure_data_obs["expo_time"] + poisson_noise_4 = np.sqrt(total_counts_4) + flux_err_4 = np.sqrt(sigma_noise_total**2 + poisson_noise_4**2) + (mag_gaussian_realiz_4, mag_err_4_minus, mag_err_4_plus) = ( + flux_error_to_magnitude_error( + amplitude_image_4, + flux_err_4, + exposure_data_obs["zero_point"], + noise=noise, + symmetric=symmetric, + ) + ) + magnitude_image_4.append(mag_gaussian_realiz_4) + mag_error_4_minus.append(mag_err_4_minus) + mag_error_4_plus.append(mag_err_4_plus) + if lens_class.image_number[0] == 2: + magnitude_image_3.append(-1) + mag_error_3_minus.append(-1) + mag_error_3_plus.append(-1) + magnitude_image_4.append(-1) + mag_error_4_minus.append(-1) + mag_error_4_plus.append(-1) + # Create a astropy table of transient data. + lens_id_col = Column(name="lens_id", data=np.array(lens_id).squeeze()) + magnitude_col_image_1 = Column( + name="mag_image_1", data=np.array(magnitude_image_1).squeeze() + ) + magnitude_col_image_2 = Column( + name="mag_image_2", data=np.array(magnitude_image_2).squeeze() + ) + magnitude_col_image_3 = Column( + name="mag_image_3", data=np.array(magnitude_image_3).squeeze() + ) + magnitude_col_image_4 = Column( + name="mag_image_4", data=np.array(magnitude_image_4).squeeze() + ) + total_mag_error_col_1_low = Column( + name="mag_error_image_1_low", data=np.array(mag_error_1_minus).squeeze() + ) + total_mag_error_col_1_high = Column( + name="mag_error_image_1_high", data=np.array(mag_error_1_plus).squeeze() + ) + total_mag_error_col_2_low = Column( + name="mag_error_image_2_low", data=np.array(mag_error_2_minus).squeeze() + ) + total_mag_error_col_2_high = Column( + name="mag_error_image_2_high", data=np.array(mag_error_2_plus).squeeze() + ) + total_mag_error_col_3_low = Column( + name="mag_error_image_3_low", data=np.array(mag_error_3_minus).squeeze() + ) + total_mag_error_col_3_high = Column( + name="mag_error_image_3_high", data=np.array(mag_error_3_plus).squeeze() + ) + total_mag_error_col_4_low = Column( + name="mag_error_image_4_low", data=np.array(mag_error_4_minus).squeeze() + ) + total_mag_error_col_4_high = Column( + name="mag_error_image_4_high", data=np.array(mag_error_4_plus).squeeze() + ) + image_list_col = Column(name="lens_image", data=image_list) + # add these new quantities as new columns in the expo_data table. + copied_exposure_data.add_columns( + [ + lens_id_col, + magnitude_col_image_1, + magnitude_col_image_2, + magnitude_col_image_3, + magnitude_col_image_4, + total_mag_error_col_1_low, + total_mag_error_col_1_high, + total_mag_error_col_2_low, + total_mag_error_col_2_high, + total_mag_error_col_3_low, + total_mag_error_col_3_high, + total_mag_error_col_4_low, + total_mag_error_col_4_high, + image_list_col, + ] + ) + return copied_exposure_data + + +def extract_lightcurves_in_different_bands(data, images=True): + """Extract lightcurves and images in different bands from the given + catalog. + + :param data: An astropy table containing lightcurves in a certain + cadence. This table must contain magnitude and corresponding + errors. The column name for magnitude should be "mag_image_n", + and column names for the error should be "mag_error_image_n_low" + and "mag_error_image_n_high", where n could be 1, 2, 3, or 4. + :param images: Boolean, if True, extracts image lists by band. + Default is True. + :return: A dictionary of magnitudes, associated errors, observation + times, and optionally image lists, structured by image or band. + """ + table = data + # Extract unique bands + bands = np.unique(table["band"]) + + # Initialize dictionaries to hold magnitudes, errors, observation times, and + # optionally image lists + magnitudes = {f"mag_image_{i}": {band: [] for band in bands} for i in range(1, 5)} + errors_low = { + f"mag_error_image_{i}_low": {band: [] for band in bands} for i in range(1, 5) + } + errors_high = { + f"mag_error_image_{i}_high": {band: [] for band in bands} for i in range(1, 5) + } + obs_time = {band: [] for band in bands} + image_lists = {band: [] for band in bands} if images else None + + # Populate dictionaries with magnitudes, errors, and optionally image lists + # corresponding to each band + for band in bands: + # Filter rows that correspond to the current band + band_data = table[table["band"] == band] + obs_time[band] = band_data["obs_time"].tolist() + + # Populate image lists (not tied to specific magnitudes) if images=True + if images: + image_lists[band] = band_data["lens_image"].tolist() + + for i in range(1, 5): + mag_col = f"mag_image_{i}" + err_low_col = f"mag_error_image_{i}_low" + err_high_col = f"mag_error_image_{i}_high" + + if mag_col in band_data.colnames and np.any(band_data[mag_col] != -1): + # Only proceed with the column if the magnitude is not -1 + magnitudes[mag_col][band] = band_data[mag_col].tolist() + errors_low[err_low_col][band] = band_data[err_low_col].tolist() + errors_high[err_high_col][band] = band_data[err_high_col].tolist() + + result = { + "magnitudes": magnitudes, + "errors_low": errors_low, + "errors_high": errors_high, + "obs_time": obs_time, + } + + if images: + result["image_lists"] = image_lists + + return result diff --git a/slsim/Plots/plot_functions.py b/slsim/Plots/plot_functions.py index 62bdfa43f..95369c340 100644 --- a/slsim/Plots/plot_functions.py +++ b/slsim/Plots/plot_functions.py @@ -129,3 +129,156 @@ def plot_montage_of_random_injected_lens(image_list, num, n_horizont=1, n_vertic left=None, bottom=None, right=None, top=None, wspace=0.0, hspace=0.05 ) return fig + + +def plot_lightcurves(data, images=True): + """Plots lightcurves dynamically for all available images across different + bands, excluding empty images. + + :param data: Dictionary returned by `extract_lightcurves_in_different_bands`. + :param images: Boolean. If True, plots some sample images in each bands. + :return: lightcurve plots (and image motage). + """ + magnitudes = data["magnitudes"] + errors_low = data["errors_low"] + errors_high = data["errors_high"] + obs_time = data["obs_time"] + if images: + image_lists = data["image_lists"] + + # Extract all bands and filter out bands where all magnitudes are not NaN across. + bands = [ + band + for band in obs_time.keys() + if any( + not np.all(np.isnan(magnitudes[image_key][band])) + for image_key in magnitudes.keys() + if image_key.startswith("mag_image_") + ) + ] + + # Identify non-empty magnitudes dynamically + image_keys = [] + for key in magnitudes.keys(): + if key.startswith("mag_image_"): + is_non_empty = any( + not np.all(np.isnan(magnitudes[key][band])) for band in bands + ) + if is_non_empty: + image_keys.append(key) + + # Prepare the plot grid: rows for bands, columns for images + + # optional images montage + fig, axs = plt.subplots( + nrows=len(bands), + ncols=len(image_keys) + (1 if images else 0), + figsize=(12, 6), + gridspec_kw={"hspace": 0.6, "wspace": 0.3}, + ) + + # Adjust axes for single-row scenarios + if len(bands) == 1: + axs = axs[np.newaxis, :] # Ensure axs is 2D + + # Add titles for each column + for col_idx, image_key in enumerate(image_keys): + axs[0, col_idx].set_title( + f"Lightcurves of image {col_idx+1}", fontsize=12, loc="center" + ) + if images: + axs[0, -1].set_title("Lens Images", fontsize=12, loc="center") + + # Plot data for each band + for row_idx, band in enumerate(bands): + band_time = obs_time[band] + + for col_idx, image_key in enumerate(image_keys): + mag_band = magnitudes[image_key][band] + err_low_band = errors_low[ + f"{image_key.replace('mag_image', 'mag_error_image')}_low" + ][band] + err_high_band = errors_high[ + f"{image_key.replace('mag_image', 'mag_error_image')}_high" + ][band] + err_band = [err_low_band, err_high_band] + + # Plot the lightcurve for the current image + axs[row_idx, col_idx].errorbar( + band_time, + mag_band, + yerr=err_band, + fmt=".", + label=f"{band}-band", + color=f"C{row_idx}", + alpha=0.7, + ) + axs[row_idx, col_idx].set_ylabel(f"Mag_{band}", fontsize=10) + axs[row_idx, col_idx].invert_yaxis() + axs[row_idx, col_idx].tick_params(axis="both", labelsize=8) + + # Add x-label only for the bottom row + if row_idx == len(bands) - 1: + axs[row_idx, col_idx].set_xlabel("MJD [Days]", fontsize=10) + + # Display lens images montage if image_lists is available + if images: + montage = ( + create_montage(image_lists[band]) + if image_lists[band] + else np.zeros((10, 10)) + ) + axs[row_idx, -1].imshow(montage, cmap="viridis", origin="lower") + axs[row_idx, -1].axis("off") + + # Adjust layout to avoid overlaps + plt.tight_layout() + return fig + + +def create_montage(images_band, grid_size=None): + """Creates a montage from a list of images, limited to the first 3 images, + with consistent scaling. This function is a helper function for + plot_lightcurves() function. + + :param images_band: List of 2D NumPy arrays representing images. + :param grid_size: Tuple specifying the grid dimensions (rows, cols). + If None, calculates the grid size to be approximately square. + :return: 2D NumPy array representing the montage. + """ + # Limit to the first 3 images + images_band = images_band[:3] + + # Ensure all elements in images_band are 2D NumPy arrays + images_band = [np.array(img) for img in images_band] + + # Determine the global minimum and maximum pixel values across all images + global_min = min(np.min(img) for img in images_band) + global_max = max(np.max(img) for img in images_band) + + # Normalize all images to the range [0, 1] based on global min and max + normalized_images = [ + (img - global_min) / (global_max - global_min) for img in images_band + ] + + # Determine grid size if not provided + n_images = len(normalized_images) + if grid_size is None: + grid_cols = n_images + grid_rows = 1 + else: + grid_rows, grid_cols = grid_size + + # Determine the size of each image + img_h, img_w = normalized_images[0].shape # Assuming all images have the same shape + + # Create an empty array for the montage + montage = np.zeros((grid_rows * img_h, grid_cols * img_w)) + + # Fill the montage with images + for idx, image in enumerate(normalized_images): + row = idx // grid_cols + col = idx % grid_cols + montage[row * img_h : (row + 1) * img_h, col * img_w : (col + 1) * img_w] = ( + image + ) + return montage diff --git a/slsim/Util/param_util.py b/slsim/Util/param_util.py index 76fc57ce7..1c2992659 100644 --- a/slsim/Util/param_util.py +++ b/slsim/Util/param_util.py @@ -382,3 +382,39 @@ def transient_event_time_mjd(min_mjd, max_mjd): """ start_mjd = np.random.randint(min_mjd, max_mjd) return start_mjd + + +def flux_error_to_magnitude_error( + flux_mean, flux_error, mag_zero_point, noise=True, symmetric=False +): + """Computes mean magnitude and corresponding errors from the provided mean + flux and associate error. + + :param flux_mean: mean flux of a transient. + :param flux_error: error in a mean flux. + :param mag_zero_point: magnitude zero point of the observation. + :param noise: Boolean. If True, a gaussian noise is added to the + lightcurve flux. + :param symmetric: Boolean. If True, a symmetric error on magnitude + is provided. + :return: mean magnitude and associted errors. + """ + mag_mean = amplitude_to_magnitude(flux_mean, mag_zero_point) + if symmetric is False: + upper_flux_limit = flux_mean + flux_error + lower_flux_limit = flux_mean - flux_error + if lower_flux_limit <= 0: + lower_flux_limit = flux_mean * 0.01 + lower_mag_limit = amplitude_to_magnitude(upper_flux_limit, mag_zero_point) + upper_mag_limit = amplitude_to_magnitude(lower_flux_limit, mag_zero_point) + mag_error_upper = upper_mag_limit - mag_mean + mag_error_lower = mag_mean - lower_mag_limit + else: + mag_error = (2.5 / np.log(10)) * flux_error / flux_mean + mag_error_upper = mag_error + mag_error_lower = mag_error + if noise is True: + flux_mean_noise = flux_mean + np.random.normal(0.0, flux_error) + mag_mean_noise = amplitude_to_magnitude(flux_mean_noise, mag_zero_point) + return mag_mean_noise, mag_error_lower, mag_error_upper + return mag_mean, mag_error_lower, mag_error_upper diff --git a/slsim/image_simulation.py b/slsim/image_simulation.py index e9d22de5d..521b38a6e 100644 --- a/slsim/image_simulation.py +++ b/slsim/image_simulation.py @@ -584,6 +584,7 @@ def lens_image( transform_pix2angle=transform_pix2angle, time=t_obs, ) + image_ps = np.nan_to_num(image_ps, nan=0) # Replace NaN if present with 0 image = convolved_deflector_source + image_ps if exposure_time is not None: final_image = image_plus_poisson_noise(image=image, exposure_time=exposure_time) diff --git a/slsim/lens.py b/slsim/lens.py index 2ebbd3ee1..8ae996208 100644 --- a/slsim/lens.py +++ b/slsim/lens.py @@ -972,6 +972,32 @@ def kappa_star(self, ra, dec): ) return kappa_star + def generate_id(self, ra=None, dec=None): + """Generate a unique ID for the lens based on its position. + + :param ra: ra coordinate of the Lens + :param dec: dec coordinate of the Lens + :return: A string representing the lens ID. + """ + if ra is None and dec is None: + ra = self.deflector_position[0] + dec = self.deflector_position[1] + else: + ra = ra + dec = dec + if self._source_type == "extended": + lens_type = "GG" + elif ( + self._source_type == "point_source" + or self._source_type == "point_plus_extended" + ): + if self.max_redshift_source_class.sn_type is not None: + lens_type = "SN" + self.max_redshift_source_class.sn_type + else: + lens_type = "QSO" + + return f"{lens_type}-LENS-{ra:.4f}_{dec:.4f}" + def image_separation_from_positions(image_positions): """Calculate image separation in arc-seconds; if there are only two images, diff --git a/tests/TestData/expo_data_opsim.hdf5 b/tests/TestData/expo_data_opsim.hdf5 index 43ee8d97f..33acb6744 100644 Binary files a/tests/TestData/expo_data_opsim.hdf5 and b/tests/TestData/expo_data_opsim.hdf5 differ diff --git a/tests/test_LsstSciencePipeline/test_opsim_pipeline.py b/tests/test_LsstSciencePipeline/test_opsim_pipeline.py index abaa05713..4b49b7564 100644 --- a/tests/test_LsstSciencePipeline/test_opsim_pipeline.py +++ b/tests/test_LsstSciencePipeline/test_opsim_pipeline.py @@ -4,9 +4,16 @@ from astropy.cosmology import FlatLambdaCDM from slsim.lens import Lens from slsim.LsstSciencePipeline.opsim_pipeline import opsim_time_series_images_data -from slsim.LsstSciencePipeline.util_lsst import opsim_variable_lens_injection +from slsim.LsstSciencePipeline.util_lsst import ( + opsim_variable_lens_injection, + optimized_transient_event_time_mjd, + transient_data_with_cadence, + extract_lightcurves_in_different_bands, +) from slsim.Sources.source import Source from slsim.Deflectors.deflector import Deflector +import astropy.coordinates as coord +import astropy.units as u import pytest @@ -121,9 +128,7 @@ def test_opsim_variable_lens_injection(pes_lens_instance): # Load example opsim data format path = os.path.dirname(__file__) module_path, _ = os.path.split(path) - expo_data = Table.read( - os.path.join(path, "../TestData/expo_data_opsim.hdf5"), path="data" - ) + expo_data = Table.read(os.path.join(path, "../TestData/expo_data_opsim.hdf5")) transform_pix2angle = np.array([[0.2, 0], [0, 0.2]]) bands = ["g", "r", "i"] @@ -142,4 +147,173 @@ def test_opsim_variable_lens_injection(pes_lens_instance): assert len(results) == len(expo_data[mask]) -test_opsim_time_series_images_data() +def test_transient_event_time_basic(): + mjd_times = np.linspace(58000, 58100, 200) + lightcurve_range = (-50, 100) + min_points = 100 + + result = optimized_transient_event_time_mjd(mjd_times, lightcurve_range, min_points) + + assert result is not None + start, end = result + lightcurve_range[0], result + lightcurve_range[1] + points_in_range = np.sum((mjd_times >= start) & (mjd_times <= end)) + assert points_in_range >= min_points + + +def test_transient_event_time_no_valid_start(): + mjd_times = np.linspace(58000, 58100, 50) + lightcurve_range = (-50, 100) + min_points = 100 + + result = optimized_transient_event_time_mjd(mjd_times, lightcurve_range, min_points) + + assert 58000 <= result <= 58100 + + +def test_transient_event_time_no_optimized_cadence(): + mjd_times = np.linspace(58000, 58100, 50) + lightcurve_range = (-50, 100) + result = optimized_transient_event_time_mjd( + mjd_times, lightcurve_range, optimized_cadence=False + ) + assert 58000 <= result <= 58100 + + +@pytest.fixture +def lens_class_instance(): + cosmo = FlatLambdaCDM(H0=70, Om0=0.3) + path = os.path.dirname(__file__) + source_dict1 = Table.read( + os.path.join(path, "../TestData/source_supernovae_new.fits"), format="fits" + ) + deflector_dict = Table.read( + os.path.join(path, "../TestData/deflector_supernovae_new.fits"), format="fits" + ) + + deflector_dict_ = dict(zip(deflector_dict.colnames, deflector_dict[0])) + gamma_pl = 1.8 + deflector_dict_["gamma_pl"] = gamma_pl + while True: + source1 = Source( + source_dict=source_dict1, + cosmo=cosmo, + source_type="point_plus_extended", + light_profile="double_sersic", + lightcurve_time=np.linspace(-50, 50, 50), + variability_model="light_curve", + kwargs_variability={"supernovae_lightcurve", "i", "r", "z", "g", "y"}, + sn_type="Ia", + sn_absolute_mag_band="bessellb", + sn_absolute_zpsys="ab", + ) + deflector = Deflector( + deflector_type="EPL", + deflector_dict=deflector_dict_, + sis_convention=False, + ) + + lens_class1 = Lens( + deflector_class=deflector, + source_class=source1, + cosmo=cosmo, + ) + if lens_class1.validity_test(): + lens_class = lens_class1 + break + return lens_class + + +@pytest.fixture +def exposure_data(): + num_obs = 20 + obs_times = np.linspace(59000, 59050, num_obs) + bkg_noise = np.random.uniform(0.1, 0.5, num_obs) + psf_fwhm = np.random.uniform(0.6, 1.2, num_obs) + zero_points = np.random.uniform(25, 30, num_obs) + expo_times = np.random.uniform(30, 60, num_obs) + bands = ["g", "r", "i", "z"] * (num_obs // 4 + 1) + + # Create ra and dec values as Angle objects with units + ra_points = coord.Angle(np.random.uniform(low=0, high=360, size=num_obs) * u.degree) + ra_points = ra_points.wrap_at(180 * u.degree) + + dec_uniform = np.random.uniform( + low=np.sin(np.radians(-75)), high=np.sin(np.radians(5)), size=num_obs + ) + dec_points = coord.Angle(np.degrees(np.arcsin(dec_uniform)) * u.degree) + + # Combine ra_points and dec_points as a list of tuples, preserving units + calexp_centers = list(zip(ra_points, dec_points)) + path = os.path.dirname(__file__) + psf_kernel = np.load( + os.path.join(path, "../TestData/psf_kernels_for_deflector.npy") + ) + psf_kernel_list = [psf_kernel] * num_obs + return Table( + { + "obs_time": obs_times, + "bkg_noise": bkg_noise, + "psf_fwhm": psf_fwhm, + "zero_point": zero_points, + "expo_time": expo_times, + "calexp_center": calexp_centers, + "band": bands[:num_obs], + "psf_kernel": psf_kernel_list, + } + ) + + +def test_transient_data_with_cadence(lens_class_instance, exposure_data): + result = transient_data_with_cadence( + lens_class=lens_class_instance, + exposure_data=exposure_data, + transform_pix2angle=np.array([[0.2, 0], [0, 0.2]]), + num_pix=61, + min_points=5, + ) + lightcurves = extract_lightcurves_in_different_bands(result) + expected_keys = lightcurves.keys() + colname = result.colnames + assert isinstance(result, Table) + assert len(result) >= 5 + assert len(colname) == 23 # 8 already existing col and 15 newly added. + assert "obs_time_in_days" in colname + assert "lens_id" in colname + assert "mag_image_1" in colname + assert "mag_image_2" in colname + assert "mag_image_3" in colname + assert "mag_image_4" in colname + assert "mag_error_image_1_low" in colname + assert "mag_error_image_1_high" in colname + assert "mag_error_image_2_low" in colname + assert "mag_error_image_2_high" in colname + assert "mag_error_image_3_low" in colname + assert "mag_error_image_3_high" in colname + assert "mag_error_image_4_low" in colname + assert "mag_error_image_4_high" in colname + assert "lens_image" in colname + + assert "magnitudes" in expected_keys + assert "errors_low" in expected_keys + assert "errors_high" in expected_keys + assert "obs_time" in expected_keys + assert "image_lists" in expected_keys + + results_i = result[result["band"] == "i"] + mag_i = results_i["mag_image_1"] + final_lightcurve_i = lightcurves["magnitudes"]["mag_image_1"]["i"] + assert np.all(np.array(list(mag_i))) == np.all(np.array(final_lightcurve_i)) + error_i_low = results_i["mag_error_image_1_low"] + error_i_high = results_i["mag_error_image_1_high"] + lightcurve_error_i_low = lightcurves["errors_low"]["mag_error_image_1_low"]["i"] + lightcurve_error_i_high = lightcurves["errors_high"]["mag_error_image_1_high"]["i"] + assert np.all(np.array(list(error_i_low))) == np.all( + np.array(lightcurve_error_i_low) + ) + assert np.all(np.array(list(error_i_high))) == np.all( + np.array(lightcurve_error_i_high) + ) + + +if __name__ == "__main__": + pytest.main() diff --git a/tests/test_Plots/test_plot_functions.py b/tests/test_Plots/test_plot_functions.py index 3a17bfd58..7f3eb4b88 100644 --- a/tests/test_Plots/test_plot_functions.py +++ b/tests/test_Plots/test_plot_functions.py @@ -7,6 +7,8 @@ from slsim.Plots.plot_functions import ( create_image_montage_from_image_list, plot_montage_of_random_injected_lens, + create_montage, + plot_lightcurves, ) from slsim.image_simulation import sharp_image from slsim.Sources.source import Source @@ -123,5 +125,86 @@ def test_plot_montage_of_random_injected_lens(quasar_lens_pop_instance): assert fig.get_size_inches()[0] == np.array([num_cols * 3, num_rows * 3])[0] +def test_create_montage_basics(): + images = [ + np.random.rand(5, 5), + np.random.rand(5, 5), + np.random.rand(5, 5), + np.random.rand(5, 5), + ] + montage = create_montage(images) + + # Check shape + assert montage.shape == (5, 15) # 1 row, 3 images wide + + # Check normalization range + assert np.min(montage) >= 0 + assert np.max(montage) <= 1 + + +def test_create_montage_specified_grid(): + images = [ + np.random.rand(5, 5), + np.random.rand(5, 5), + np.random.rand(5, 5), + ] + grid_size = (1, 3) + montage = create_montage(images, grid_size=grid_size) + + # Check shape + assert montage.shape == (5, 15) # 1 row, 3 images wide + + +def test_plot_lightcurves(): + data = { + "magnitudes": { + "mag_image_1": {"g": np.random.rand(5), "r": np.random.rand(5)}, + "mag_image_2": {"g": np.random.rand(5), "r": np.random.rand(5)}, + }, + "errors_low": { + "mag_error_image_1_low": {"g": np.random.rand(5), "r": np.random.rand(5)}, + "mag_error_image_2_low": {"g": np.random.rand(5), "r": np.random.rand(5)}, + }, + "errors_high": { + "mag_error_image_1_high": {"g": np.random.rand(5), "r": np.random.rand(5)}, + "mag_error_image_2_high": {"g": np.random.rand(5), "r": np.random.rand(5)}, + }, + "obs_time": {"g": np.arange(5), "r": np.arange(5)}, + "image_lists": { + "g": [np.random.rand(10, 10) for _ in range(3)], + "r": [np.random.rand(10, 10) for _ in range(3)], + }, + } + data2 = { + "magnitudes": { + "mag_image_1": {"g": np.random.rand(5)}, + "mag_image_2": {"g": np.random.rand(5)}, + }, + "errors_low": { + "mag_error_image_1_low": {"g": np.random.rand(5)}, + "mag_error_image_2_low": {"g": np.random.rand(5)}, + }, + "errors_high": { + "mag_error_image_1_high": {"g": np.random.rand(5)}, + "mag_error_image_2_high": {"g": np.random.rand(5)}, + }, + "obs_time": {"g": np.arange(5)}, + "image_lists": {"g": [np.random.rand(10, 10) for _ in range(3)]}, + } + + fig = plot_lightcurves(data, images=True) + fig2 = plot_lightcurves(data, images=False) + fig3 = plot_lightcurves(data2, images=True) + ax1 = fig.get_axes() + ax2 = fig2.get_axes() + ax3 = fig3.get_axes() + assert fig is not None + assert isinstance(fig, plt.Figure) + assert fig2 is not None + assert isinstance(fig2, plt.Figure) + assert len(ax1) == len(ax2) + 2 + assert len(ax3) == 3 + + if __name__ == "__main__": pytest.main() diff --git a/tests/test_Source/test_quasar_plus_galaxies.py b/tests/test_Source/test_quasar_plus_galaxies.py index ebdaff2b7..9cc34555b 100644 --- a/tests/test_Source/test_quasar_plus_galaxies.py +++ b/tests/test_Source/test_quasar_plus_galaxies.py @@ -25,7 +25,7 @@ def test_quasar_plus_galaxies(): "mag_r", "mag_i", "mag_z", - "mag_Y", + "mag_y", "ps_mag_r", "ps_mag_g", "ps_mag_i", diff --git a/tests/test_lens.py b/tests/test_lens.py index 3faa3a61d..59fdb023b 100644 --- a/tests/test_lens.py +++ b/tests/test_lens.py @@ -63,6 +63,16 @@ def setup_method(self): self.gg_lens = gg_lens break + def test_lens_id_gg(self): + lens_id = self.gg_lens.generate_id() + ra = self.gg_lens.deflector_position[0] + dec = self.gg_lens.deflector_position[1] + ra2 = 12.03736542 + dec2 = 35.17363534 + lens_id2 = self.gg_lens.generate_id(ra=ra2, dec=dec2) + assert lens_id == f"GG-LENS-{ra:.4f}_{dec:.4f}" + assert lens_id2 == f"GG-LENS-{ra2:.4f}_{dec2:.4f}" + def test_deflector_ellipticity(self): e1_light, e2_light, e1_mass, e2_mass = self.gg_lens.deflector_ellipticity() assert pytest.approx(e1_light, rel=1e-3) == -0.05661955320450283 @@ -303,6 +313,17 @@ def test_point_source_magnitude(pes_lens_instance): assert len(mag_unlensed) == 1 +def test_lens_id_qso(pes_lens_instance): + lens_id = pes_lens_instance.generate_id() + ra = pes_lens_instance.deflector_position[0] + dec = pes_lens_instance.deflector_position[1] + ra2 = 12.03736542 + dec2 = 35.17363534 + lens_id2 = pes_lens_instance.generate_id(ra=ra2, dec=dec2) + assert lens_id == f"QSO-LENS-{ra:.4f}_{dec:.4f}" + assert lens_id2 == f"QSO-LENS-{ra2:.4f}_{dec2:.4f}" + + @pytest.fixture def supernovae_lens_instance(): path = os.path.dirname(__file__) @@ -521,6 +542,19 @@ def supernovae_lens_instance_double_sersic_multisource(): return supernovae_lens +def test_lens_id_snia(supernovae_lens_instance_double_sersic_multisource): + lens_id = supernovae_lens_instance_double_sersic_multisource.generate_id() + ra = supernovae_lens_instance_double_sersic_multisource.deflector_position[0] + dec = supernovae_lens_instance_double_sersic_multisource.deflector_position[1] + ra2 = 12.03736542 + dec2 = 35.17363534 + lens_id2 = supernovae_lens_instance_double_sersic_multisource.generate_id( + ra=ra2, dec=dec2 + ) + assert lens_id == f"SNIa-LENS-{ra:.4f}_{dec:.4f}" + assert lens_id2 == f"SNIa-LENS-{ra2:.4f}_{dec2:.4f}" + + class TestMultiSource(object): def setup_method(self): np.random.seed(42) diff --git a/tests/test_param_util.py b/tests/test_param_util.py index d673d3999..6f3509b6b 100644 --- a/tests/test_param_util.py +++ b/tests/test_param_util.py @@ -18,6 +18,7 @@ catalog_with_angular_size_in_arcsec, convert_mjd_to_days, transient_event_time_mjd, + flux_error_to_magnitude_error, ) from slsim.Sources.SourceVariability.variability import Variability from astropy.io import fits @@ -263,5 +264,90 @@ def test_start_point_mjd_time(): assert 60000 <= result <= 60400 +def test_flux_error_to_magnitude_error_basic(): + flux_mean = 100.0 + flux_error = 10.0 + mag_zero_point = 27.0 + + mag_mean, mag_error_lower, mag_error_upper = flux_error_to_magnitude_error( + flux_mean=flux_mean, + flux_error=flux_error, + mag_zero_point=mag_zero_point, + noise=False, + symmetric=False, + ) + + expected_mag_mean = amplitude_to_magnitude(flux_mean, mag_zero_point) + assert np.isclose(mag_mean, expected_mag_mean), "Mag mean computation failed." + + upper_flux_limit = flux_mean + flux_error + lower_flux_limit = max(flux_mean - flux_error, flux_mean * 0.01) + + expected_mag_error_lower = expected_mag_mean - amplitude_to_magnitude( + upper_flux_limit, mag_zero_point + ) + expected_mag_error_upper = ( + amplitude_to_magnitude(lower_flux_limit, mag_zero_point) - expected_mag_mean + ) + + assert mag_error_lower == expected_mag_error_lower + assert mag_error_upper == expected_mag_error_upper + + +def test_flux_error_to_magnitude_error_symmetric(): + flux_mean = 100.0 + flux_error = 10.0 + mag_zero_point = 27.0 + + mag_mean, mag_error_lower, mag_error_upper = flux_error_to_magnitude_error( + flux_mean=flux_mean, + flux_error=flux_error, + mag_zero_point=mag_zero_point, + noise=False, + symmetric=True, + ) + + expected_mag_mean = amplitude_to_magnitude(flux_mean, mag_zero_point) + expected_mag_error = (2.5 / np.log(10)) * flux_error / flux_mean + + assert mag_mean == expected_mag_mean + assert mag_error_lower == expected_mag_error + assert mag_error_upper == expected_mag_error + + +def test_flux_error_to_magnitude_error_with_noise(): + flux_mean = 100.0 + flux_error = 10.0 + mag_zero_point = 27.0 + + mag_mean, mag_error_lower, mag_error_upper = flux_error_to_magnitude_error( + flux_mean=flux_mean, + flux_error=flux_error, + mag_zero_point=mag_zero_point, + noise=True, + symmetric=False, + ) + + # Noise makes mag_mean non-deterministic, so we validate the magnitude range + mag_mean_2 = amplitude_to_magnitude(flux_mean, mag_zero_point) + + assert ( + mag_mean_2 - 3 * mag_error_lower < mag_mean <= mag_mean_2 + 3 * mag_error_upper + ) + + +def test_flux_error_to_magnitude_error_negative_flux(): + mag_mean, mag_error_lower, mag_error_upper = flux_error_to_magnitude_error( + flux_mean=10.0, + flux_error=11.0, + mag_zero_point=27.0, + symmetric=False, + noise=False, + ) + expected_upper_mag = amplitude_to_magnitude(0.1, 27) + expected_upper_error = expected_upper_mag - mag_mean + assert expected_upper_error == mag_error_upper + + if __name__ == "__main__": pytest.main()