From c9ee217465cd2e1303cc2e5440a973df4e8d9656 Mon Sep 17 00:00:00 2001 From: NigelHambly Date: Mon, 15 May 2023 20:02:26 +0100 Subject: [PATCH] Bug fixed for correct XP filtering --- .../5. Working with Gaia XP spectra.zpln | 162 +++++++++--------- .../5. Working with Gaia XP spectra..ipynb | 4 +- 2 files changed, 86 insertions(+), 80 deletions(-) diff --git a/Public Examples/5. Working with Gaia XP spectra.zpln b/Public Examples/5. Working with Gaia XP spectra.zpln index c43b31f..8994932 100644 --- a/Public Examples/5. Working with Gaia XP spectra.zpln +++ b/Public Examples/5. Working with Gaia XP spectra.zpln @@ -4,7 +4,7 @@ "title": "Introduction", "text": "%md\n\n\n \nThe bulk of Gaia XP spectra at Gaia DR3 are provided in a parametric \"continuous\" representational form (as opposed to conventional sampled form, i.e. fluxes in wavelength bins) in table `gaiadr3.xp_continuous_mean_spectrum`. Utilities for handling this form, including conversion to sampled form and plotting, are provided in a bespoke Python package [GaiaXPy](https://gaia-dpci.github.io/GaiaXPy-website/) which is available on this platform. A small subset of the XP spectra are provided also in sampled form in table `gaiadr3.xp_sampled_mean_spectrum` but these are for illustrative purposes only: users are strongly encouraged to familiarise themselves and work with the continuous representation, not least in order to handle correctly the statistical uncertainties inherent to the data.\n\nTo access GaiaXPy on this platform simply import the package as follows:\n\n import gaiaxpy\n\nthen all classes and utility functions etc. will be available.\n", "user": "NHambly", - "dateUpdated": "2022-09-01T15:34:36+0000", + "dateUpdated": "2023-05-15T14:43:12+0000", "progress": 0, "config": { "tableHide": false, @@ -38,11 +38,11 @@ "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1650981001262_1093264483", + "jobName": "paragraph_1682508842872_871207808", "id": "paragraph_1650981001262_1093264483", - "dateCreated": "2022-04-26T13:50:01+0000", - "dateStarted": "2022-09-01T15:34:36+0000", - "dateFinished": "2022-09-01T15:34:36+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T14:43:12+0000", + "dateFinished": "2023-05-15T14:43:12+0000", "status": "FINISHED", "focus": true, "$$hashKey": "object:196" @@ -51,7 +51,7 @@ "title": "Sampling and plotting spectra (continuous representation)", "text": "%pyspark\n\n# standard platform set-up\nimport gaiadmpsetup\n\n# utility code set-up\nfrom gaiaxpy import plot_spectra, convert\n\n# XP products available in Gaia DR3, so set the default database context accordingly for convenience\nspark.sql('USE gaiadr3')\n\n# grab an example spectrum from the table\ncontinuous_df = spark.sql('SELECT * FROM gaiadr3.xp_continuous_mean_spectrum WHERE source_id = 5853498713190525696')\n# ... this source identifier corresponds to Proxima Cen (= Alpha Cen C, spectral type M5.5V i.e. a mid-M dwarf)\n\n# convert to a Pandas dataframe for GaiaXPy\ncontinuous_spectrum = continuous_df.toPandas()\n\n# convert to sampled form:\nsampled_spectrum, sampling = convert(continuous_spectrum, save_file = False)\n \n# plot to sanity check:\nplot_spectra(sampled_spectrum, sampling = sampling, multi=False, show_plot=True, output_path=None, legend=True)\n\n", "user": "NHambly", - "dateUpdated": "2022-09-01T15:34:38+0000", + "dateUpdated": "2023-05-15T14:43:12+0000", "progress": 0, "config": { "tableHide": false, @@ -64,7 +64,7 @@ "colWidth": 12, "editorMode": "ace/mode/python", "fontSize": 9, - "editorHide": false, + "editorHide": true, "title": true, "results": {}, "enabled": true @@ -78,7 +78,7 @@ "msg": [ { "type": "TEXT", - "data": "Processing data [100%]\r \r\n" + "data": "WARNING: IERSStaleWarning: leap-second file is expired. [astropy.utils.iers.iers]\nCreated TAP+ (v1.2.1) - Connection:\n\tHost: gea.esac.esa.int\n\tUse HTTPS: True\n\tPort: 443\n\tSSL Port: 443\nCreated TAP+ (v1.2.1) - Connection:\n\tHost: geadata.esac.esa.int\n\tUse HTTPS: True\n\tPort: 443\n\tSSL Port: 443\nProcessing data [100%]\r \r\n" }, { "type": "HTML", @@ -95,19 +95,19 @@ "group": "spark", "values": [ { - "jobUrl": "http://zeppelin:4041/jobs/job?id=401", - "$$hashKey": "object:695" + "jobUrl": "http://zeppelin:4040/jobs/job?id=0", + "$$hashKey": "object:651" } ], "interpreterSettingId": "spark" } }, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1650981269844_2057939329", + "jobName": "paragraph_1682508842872_985995614", "id": "paragraph_1650981269844_2057939329", - "dateCreated": "2022-04-26T13:54:29+0000", - "dateStarted": "2022-09-01T15:34:38+0000", - "dateFinished": "2022-09-01T15:34:43+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T14:43:12+0000", + "dateFinished": "2023-05-15T14:43:54+0000", "status": "FINISHED", "$$hashKey": "object:197" }, @@ -115,7 +115,7 @@ "title": "Creating a single, externally calibrated spectrum from BP and RP", "text": "%pyspark\n\nfrom gaiaxpy import calibrate\n\n# GaiaXPy provides classes and methods to create an externally calibrated single spectrum from the internal XP continuous representation:\ncalibrated_spectrum, sampling = calibrate(continuous_spectrum, save_file = False)\n\n# plot it\nplot_spectra(calibrated_spectrum, sampling = sampling, legend = False)\n", "user": "NHambly", - "dateUpdated": "2022-09-01T15:34:48+0000", + "dateUpdated": "2023-05-15T14:43:54+0000", "progress": 0, "config": { "editorSetting": { @@ -129,7 +129,8 @@ "fontSize": 9, "title": true, "results": {}, - "enabled": true + "enabled": true, + "editorHide": false }, "settings": { "params": {}, @@ -151,11 +152,11 @@ "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1662044943200_953477984", + "jobName": "paragraph_1682508842872_1691538784", "id": "paragraph_1662044943200_953477984", - "dateCreated": "2022-09-01T15:09:03+0000", - "dateStarted": "2022-09-01T15:34:48+0000", - "dateFinished": "2022-09-01T15:34:48+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T14:43:54+0000", + "dateFinished": "2023-05-15T14:43:54+0000", "status": "FINISHED", "$$hashKey": "object:198" }, @@ -163,7 +164,7 @@ "title": "Searching for similar spectra", "text": "%md\n\nThe code in the following cells illustrates a workflow where we trawl through the a large XP spectral data looking for spectra similar to a high signal-to-noise template example. The implementation takes advantage of the end-user programmability of the distributed query execution engine to return results in a reasonable time. The approach is to look for spectra having the same spectral shape, as expressed in the coefficients of the continuous representation.", "user": "NHambly", - "dateUpdated": "2022-09-01T15:35:03+0000", + "dateUpdated": "2023-05-15T14:43:54+0000", "progress": 0, "config": { "tableHide": false, @@ -197,19 +198,19 @@ "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1662045004984_1851487345", + "jobName": "paragraph_1682508842873_183111412", "id": "paragraph_1662045004984_1851487345", - "dateCreated": "2022-09-01T15:10:04+0000", - "dateStarted": "2022-09-01T15:35:03+0000", - "dateFinished": "2022-09-01T15:35:03+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T14:43:54+0000", + "dateFinished": "2023-05-15T14:43:54+0000", "status": "FINISHED", "$$hashKey": "object:199" }, { "title": "Utility code", - "text": "%pyspark\n\nimport numpy as np\nimport pandas as pd\nfrom pyspark.sql.functions import pandas_udf\nfrom pyspark.sql import DataFrame\n#from scipy.stats import chi\n\n# static constants for use when reconstructing a covariance matrix from the flattened upper-triangular correlation matrix stored in xp_continuous_mean_spectrum\nNUM_XP_COEFFS = 55\n# these indexes define the column-major positions of the correlation vector elements in the lower triangle of the 2d correlation matrix ...\nlower_index = np.tril_indices(NUM_XP_COEFFS,-1)\n# ... note that numpy indexing is row-major. The upper triangular index is created from the lower, reflecting across the diagonal,\n# by transposing the axes - this results in the required column-major indexing for the upper part\nupper_index = (lower_index[1], lower_index[0])\n# correlation matrix, empty apart from unity on the diagonal (off-diagonal elements to be filled in on a case-by-case basis)\ncorrelation_matrix = np.diag(np.ones(NUM_XP_COEFFS))\n\ndef make_correlation_matrix(correlation_vector : np.ndarray) -> np.ndarray:\n '''\n Returns the fully populated 2d correlation matrix given the flattened, 1d upper-triangular off-diagonal\n elements of the same as persisted in the table of XP continuous representation spectra.\n '''\n # copy in unique, off-diagonal elements from the flattened correlation vector into the 2d-indexed positions\n correlation_matrix[upper_index] = correlation_vector\n correlation_matrix[lower_index] = correlation_vector\n \n # give back the complete correlation matrix\n return correlation_matrix\n\ndef make_covariance_matrix(complete_correlation_matrix : np.ndarray, coefficient_error_vector : np.ndarray) -> np.ndarray:\n '''\n Creates the fully reconstructed 2d covariance matrix of an XP continuous representation spectrum given the\n complete 2d correlation matrix and the vector of formal errors on the coefficients. Note that Gaia DPAC CU5 scale \n predicted coefficient errors by the standard deviation as a post-hoc correction to the formal (sqrt) variances.\n GaiaXPy actually reverses this scaling in computation of the covariance matrix to return the latter as exactly\n that produced as a result of the least-squares solution for the coefficients. Such a de-scaling of the errors\n is not applied here: we assume that the uncertainties are best represented with this scaling intact.\n '''\n # 2d matrix with the errors on the diagonal\n error_matrix = np.diag(coefficient_error_vector)\n \n # from the standard relationship between covariance and correlation\n return error_matrix @ (error_matrix @ complete_correlation_matrix)\n \ndef xp_mahalanobis_distance(coeff_vector_1 : np.ndarray, covariance_1 : np.ndarray,\n coeff_vector_2 : np.ndarray, error_vector_2 : np.ndarray, correlation_vector_2 : np.ndarray) -> float:\n '''\n Computes the Mahalanobis distance between two XP spectra given template coefficients and fully populated\n covariance for the first, and the data record of the second candidate spectrum, i.e. coefficients, errors and \n upper-triangular part of the correlation matrix in the continuous representation. The second set of\n coefficients and errors should be scaled to the same flux level as the template spectrum. This means that\n the distance returned quantifies how close the candidate SED shape is to that of the template regardless any\n difference in intrinsic luminosity.\n \n This function uses plain matrix inversion of the combined covariance matrix. This can be numerically unstable\n and result in a negative squared distance resulting in turn in a return value of not-a-number. It is up to \n the calling application to handle this condition.\n '''\n # second covariance matrix\n corr2 = make_correlation_matrix(correlation_vector_2)\n covariance_2 = make_covariance_matrix(corr2, error_vector_2)\n \n # form covariance of the coefficient difference vector as sum of individual covariance \n cocovar = covariance_1 + covariance_2\n \n # inverse of combined, scaled covariance\n cocovar_inv = np.linalg.inv(cocovar)\n\n # use these in the computation of the square of the Mahalanobis distance, e.g. see source linked from\n # https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.mahalanobis.html\n delta = coeff_vector_1 - coeff_vector_2\n d = np.sqrt(np.dot(np.dot(delta, cocovar_inv), delta))\n \n # resulting Mahalanobs distance follows chi distribution with NUM_XP_COEFFS degrees of freedom (M. Weiler, personal communication)\n # for identical spectra\n return d\n \ndef find_similar_continuous_spectra(data_frame : DataFrame, template_df : DataFrame) -> DataFrame:\n '''\n Given data frames defining a large set of XP spectra in continuous representation, \n and a single template example also in continuous representation, search the former for cases\n similar to the latter. The data frame of the set of spectra being\n searched is annotated with a dissimilarity statistic: the greater the value the more\n dissimilar is the candidate spectrum to the template given. By definition this statistic\n will be zero for the template spectrum if it is present in the set of candidates.\n \n Parameters:\n -----------\n data_frame : DataFrame()\n the data frame encapsulating the set of XP continuous representation spectra to be searched\n template_df : DataFrame()\n the template, also in XP continuous representation encapsulated in a data frame.\n \n return : DataFrame()\n a new data frame annotated with a dissimilarity (increasingly positive) statistic where \n zero indicates a perfect match.\n '''\n \n # convenience reference to template as a Row object:\n template_row = template_df.collect()[0]\n \n # extract the template arrays \n template_bp_coefficients = np.array(template_row['bp_coefficients']).reshape(-1)\n template_bp_coefficient_errors = np.array(template_row['bp_coefficient_errors']).reshape(-1)\n template_bp_correlations = np.array(template_row['bp_coefficient_correlations']).reshape(-1)\n template_rp_coefficients = np.array(template_row['rp_coefficients']).reshape(-1)\n template_rp_coefficient_errors = np.array(template_row['rp_coefficient_errors']).reshape(-1)\n template_rp_correlations = np.array(template_row['rp_coefficient_correlations']).reshape(-1)\n template_gmag = template_row['phot_g_mean_mag']\n\n # precompute the required vectors and matrices for the template\n bp_correl_mat = make_correlation_matrix(template_bp_correlations)\n template_bp_covariance = make_covariance_matrix(bp_correl_mat, template_bp_coefficient_errors)\n rp_correl_mat = make_correlation_matrix(template_rp_correlations)\n template_rp_covariance = make_covariance_matrix(rp_correl_mat, template_rp_coefficient_errors)\n \n # define a vectorised Pandas UDF against the template for comparison of other spectra against it\n @pandas_udf('float')\n def xp_is_similar(bp_coeffs:pd.Series, bp_coeff_errors:pd.Series, bp_correlations:pd.Series, \n rp_coeffs:pd.Series, rp_coeff_errors:pd.Series, rp_correlations:pd.Series,\n gmags:pd.Series) -> pd.Series:\n '''\n Create a similarity metric for the XP continuous representation coefficients with respect \n to those of a predefined template. Similarity is based on a combined BP and RP Mahalanobis\n distance between the coefficient sets with full accounting for covariance. Input arguments\n are series of the BP and RP coefficients, errors and correlation vectors as stored in table\n xp_continuous_mean_spectrum. The returned object is a corresponding series of floats of the\n quadrature sum of the BP and RP Mahalanobis distances between each candidate spectrum and\n the static template defined above.\n '''\n \n # initialise the results series\n results = pd.Series(np.full(bp_coeffs.size, 0.0))\n\n # iterate over the data series\n for i in range(bp_coeffs.size):\n \n # normalise the candidate flux coefficients and uncertainties to that of the template\n norm = 10.0**(0.4 * (template_gmag - gmags.iloc[i]))\n bp_coeffs_norm = bp_coeffs.iloc[i] / norm\n bp_coeff_errors_norm = bp_coeff_errors.iloc[i] / norm\n rp_coeffs_norm = rp_coeffs.iloc[i] / norm\n rp_coeff_errors_norm = rp_coeff_errors.iloc[i] / norm\n \n # Mahalanobis distances for the individual BP and RP coefficient sets normalised to the flux level of the template\n bp_mdist = xp_mahalanobis_distance(template_bp_coefficients, template_bp_covariance, bp_coeffs_norm, bp_coeff_errors_norm, bp_correlations.iloc[i])\n rp_mdist = xp_mahalanobis_distance(template_rp_coefficients, template_rp_covariance, rp_coeffs_norm, rp_coeff_errors_norm, rp_correlations.iloc[i])\n \n # combined Mahalanobis distance\n results.iloc[i] = np.sqrt(bp_mdist * bp_mdist + rp_mdist * rp_mdist)\n \n return results\n\n # add in the similarity statistic\n data_frame = data_frame.withColumn('xp_similar', xp_is_similar(\n data_frame.bp_coefficients, data_frame.bp_coefficient_errors, data_frame.bp_coefficient_correlations, \n data_frame.rp_coefficients, data_frame.rp_coefficient_errors, data_frame.rp_coefficient_correlations, data_frame.phot_g_mean_mag))\n \n # give back the full set filtering out any nulls (which may result from NaN individual Mahalanobis distances)\n return data_frame.filter(data_frame.xp_similar.isNotNull())\n\n", + "text": "%pyspark\n\nimport numpy as np\nimport pandas as pd\nfrom pyspark.sql.functions import pandas_udf\nfrom pyspark.sql import DataFrame\n#from scipy.stats import chi\n\n# static constants for use when reconstructing a covariance matrix from the flattened upper-triangular correlation matrix stored in xp_continuous_mean_spectrum\nNUM_XP_COEFFS = 55\n# these indexes define the column-major positions of the correlation vector elements in the lower triangle of the 2d correlation matrix ...\nlower_index = np.tril_indices(NUM_XP_COEFFS,-1)\n# ... note that numpy indexing is row-major. The upper triangular index is created from the lower, reflecting across the diagonal,\n# by transposing the axes - this results in the required column-major indexing for the upper part\nupper_index = (lower_index[1], lower_index[0])\n# correlation matrix, empty apart from unity on the diagonal (off-diagonal elements to be filled in on a case-by-case basis)\ncorrelation_matrix = np.diag(np.ones(NUM_XP_COEFFS))\n\ndef make_correlation_matrix(correlation_vector : np.ndarray) -> np.ndarray:\n '''\n Returns the fully populated 2d correlation matrix given the flattened, 1d upper-triangular off-diagonal\n elements of the same as persisted in the table of XP continuous representation spectra.\n '''\n # copy in unique, off-diagonal elements from the flattened correlation vector into the 2d-indexed positions\n correlation_matrix[upper_index] = correlation_vector\n correlation_matrix[lower_index] = correlation_vector\n \n # give back the complete correlation matrix\n return correlation_matrix\n\ndef make_covariance_matrix(complete_correlation_matrix : np.ndarray, coefficient_error_vector : np.ndarray) -> np.ndarray:\n '''\n Creates the fully reconstructed 2d covariance matrix of an XP continuous representation spectrum given the\n complete 2d correlation matrix and the vector of formal errors on the coefficients. Note that Gaia DPAC CU5 scale \n predicted coefficient errors by the standard deviation as a post-hoc correction to the formal (sqrt) variances.\n GaiaXPy actually reverses this scaling in computation of the covariance matrix to return the latter as exactly\n that produced as a result of the least-squares solution for the coefficients. Such a de-scaling of the errors\n is not applied here: we assume that the uncertainties are best represented with this scaling intact.\n '''\n # 2d matrix with the errors on the diagonal\n error_matrix = np.diag(coefficient_error_vector)\n \n # from the standard relationship between covariance and correlation\n return error_matrix @ (error_matrix @ complete_correlation_matrix)\n \ndef xp_mahalanobis_distance(coeff_vector_1 : np.ndarray, covariance_1 : np.ndarray,\n coeff_vector_2 : np.ndarray, error_vector_2 : np.ndarray, correlation_vector_2 : np.ndarray) -> float:\n '''\n Computes the Mahalanobis distance between two XP spectra given template coefficients and fully populated\n covariance for the first, and the data record of the second candidate spectrum, i.e. coefficients, errors and \n upper-triangular part of the correlation matrix in the continuous representation. The second set of\n coefficients and errors should be scaled to the same flux level as the template spectrum. This means that\n the distance returned quantifies how close the candidate SED shape is to that of the template regardless any\n difference in intrinsic luminosity.\n \n This function uses plain matrix inversion of the combined covariance matrix. This can be numerically unstable\n and result in a negative squared distance resulting in turn in a return value of not-a-number. It is up to \n the calling application to handle this condition.\n '''\n # second covariance matrix\n corr2 = make_correlation_matrix(correlation_vector_2)\n covariance_2 = make_covariance_matrix(corr2, error_vector_2)\n \n # form covariance of the coefficient difference vector as sum of individual covariance \n cocovar = covariance_1 + covariance_2\n \n # inverse of combined, scaled covariance\n cocovar_inv = np.linalg.inv(cocovar)\n\n # use these in the computation of the square of the Mahalanobis distance, e.g. see source linked from\n # https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.mahalanobis.html\n delta = coeff_vector_1 - coeff_vector_2\n d = np.sqrt(np.dot(np.dot(delta, cocovar_inv), delta))\n \n # resulting Mahalanobs distance follows chi distribution with NUM_XP_COEFFS degrees of freedom (M. Weiler, personal communication)\n # for identical spectra\n return d\n \ndef xp_signal_to_noise(coeff : np.ndarray, coeff_err : np.ndarray, n_valid : int) -> float:\n '''\n Computes global signal-to-noise ratio of XP spectra via L2 norms - see De Angeli et al. (2022) Section 5.3.\n Assume this should done using only the N relevenat bases.\n \n Returns: signal-to-noise ratio for the individual band spectrum coefficients and uncertainties\n '''\n \n # not sure why is it necessary to type check this but the trawl barfs near the end if this catch is not present (!)\n if not type(n_valid) is int: return 0.0\n \n # signal-to-noise from L2 norm of the XP coefficients\n return np.linalg.norm(coeff[:n_valid]) / np.linalg.norm(coeff_err[:n_valid])\n\n@pandas_udf('float')\ndef signal_to_noise_pudf(coeffs : pd.Series, coeff_errors : pd.Series, nums_valid : pd.Series) -> pd.Series:\n '''\n Compute a vector of global signal-to-noise estimates for an individual XP band given the basis\n coefficients, their uncertainties, and the number of valid components.\n '''\n \n # simply vectorize by combining series into a single Pandas dataframe object on which the apply method can be invoked\n return pd.concat([coeffs, coeff_errors, nums_valid], axis = 1).apply(lambda row: xp_signal_to_noise(row[0], row[1], row[2]), axis = 1)\n \ndef find_similar_continuous_spectra(data_frame : DataFrame, template_df : DataFrame) -> DataFrame:\n '''\n Given data frames defining a large set of XP spectra in continuous representation, \n and a single template example also in continuous representation, search the former for cases\n similar to the latter. The data frame of the set of spectra being\n searched is annotated with a dissimilarity statistic: the greater the value the more\n dissimilar is the candidate spectrum to the template given. By definition this statistic\n will be zero for the template spectrum if it is present in the set of candidates.\n \n Parameters:\n -----------\n data_frame : DataFrame()\n the data frame encapsulating the set of XP continuous representation spectra to be searched\n template_df : DataFrame()\n the template, also in XP continuous representation encapsulated in a data frame.\n \n return : DataFrame()\n a new data frame annotated with a dissimilarity (increasingly positive) statistic where \n zero indicates a perfect match.\n '''\n \n # convenience reference to template as a Row object:\n template_row = template_df.collect()[0]\n \n # extract the template arrays \n template_bp_coefficients = np.array(template_row['bp_coefficients']).reshape(-1)\n template_bp_coefficient_errors = np.array(template_row['bp_coefficient_errors']).reshape(-1)\n template_bp_correlations = np.array(template_row['bp_coefficient_correlations']).reshape(-1)\n template_rp_coefficients = np.array(template_row['rp_coefficients']).reshape(-1)\n template_rp_coefficient_errors = np.array(template_row['rp_coefficient_errors']).reshape(-1)\n template_rp_correlations = np.array(template_row['rp_coefficient_correlations']).reshape(-1)\n template_gmag = template_row['phot_g_mean_mag']\n\n # precompute the required vectors and matrices for the template\n bp_correl_mat = make_correlation_matrix(template_bp_correlations)\n template_bp_covariance = make_covariance_matrix(bp_correl_mat, template_bp_coefficient_errors)\n rp_correl_mat = make_correlation_matrix(template_rp_correlations)\n template_rp_covariance = make_covariance_matrix(rp_correl_mat, template_rp_coefficient_errors)\n \n # define a vectorised Pandas UDF against the template for comparison of other spectra against it\n @pandas_udf('float')\n def xp_is_similar(bp_coeffs:pd.Series, bp_coeff_errors:pd.Series, bp_correlations:pd.Series, \n rp_coeffs:pd.Series, rp_coeff_errors:pd.Series, rp_correlations:pd.Series,\n gmags:pd.Series) -> pd.Series:\n '''\n Create a similarity metric for the XP continuous representation coefficients with respect \n to those of a predefined template. Similarity is based on a combined BP and RP Mahalanobis\n distance between the coefficient sets with full accounting for covariance. Input arguments\n are series of the BP and RP coefficients, errors and correlation vectors as stored in table\n xp_continuous_mean_spectrum. The returned object is a corresponding series of floats of the\n quadrature sum of the BP and RP Mahalanobis distances between each candidate spectrum and\n the static template defined above.\n '''\n \n # initialise the results series\n results = pd.Series(np.full(bp_coeffs.size, 0.0))\n\n # iterate over the data series\n for i in range(bp_coeffs.size):\n \n # normalise the candidate flux coefficients and uncertainties to that of the template\n norm = 10.0**(0.4 * (template_gmag - gmags.iloc[i]))\n bp_coeffs_norm = bp_coeffs.iloc[i] / norm\n bp_coeff_errors_norm = bp_coeff_errors.iloc[i] / norm\n rp_coeffs_norm = rp_coeffs.iloc[i] / norm\n rp_coeff_errors_norm = rp_coeff_errors.iloc[i] / norm\n \n # Mahalanobis distances for the individual BP and RP coefficient sets normalised to the flux level of the template\n bp_mdist = xp_mahalanobis_distance(template_bp_coefficients, template_bp_covariance, bp_coeffs_norm, bp_coeff_errors_norm, bp_correlations.iloc[i])\n rp_mdist = xp_mahalanobis_distance(template_rp_coefficients, template_rp_covariance, rp_coeffs_norm, rp_coeff_errors_norm, rp_correlations.iloc[i])\n \n # combined Mahalanobis distance\n results.iloc[i] = np.sqrt(bp_mdist * bp_mdist + rp_mdist * rp_mdist)\n \n return results\n\n # filter the given data such that only those sources with S/N ratio of 90% or better are tested for similarity for best reliability\n # N.B. completeness is totally neglected here in this illustrative workflow!\n template_bp_n_rel_bases = template_row['bp_n_relevant_bases']\n template_rp_n_rel_bases = template_row['rp_n_relevant_bases']\n threshold_bp_snr = 0.9 * xp_signal_to_noise(template_bp_coefficients, template_bp_coefficient_errors, template_bp_n_rel_bases)\n threshold_rp_snr = 0.9 * xp_signal_to_noise(template_rp_coefficients, template_rp_coefficient_errors, template_rp_n_rel_bases)\n data_frame = data_frame.filter(signal_to_noise_pudf('bp_coefficients', 'bp_coefficient_errors', 'bp_n_relevant_bases') > threshold_bp_snr)\n data_frame = data_frame.filter(signal_to_noise_pudf('rp_coefficients', 'rp_coefficient_errors', 'rp_n_relevant_bases') > threshold_rp_snr)\n \n # add in the similarity statistic\n data_frame = data_frame.withColumn('xp_similar', xp_is_similar(\n data_frame.bp_coefficients, data_frame.bp_coefficient_errors, data_frame.bp_coefficient_correlations, \n data_frame.rp_coefficients, data_frame.rp_coefficient_errors, data_frame.rp_coefficient_correlations, data_frame.phot_g_mean_mag))\n \n # give back the full set filtering out any nulls (which may result from NaN individual Mahalanobis distances)\n return data_frame.filter(data_frame.xp_similar.isNotNull())\n\n", "user": "NHambly", - "dateUpdated": "2022-09-02T08:52:44+0000", + "dateUpdated": "2023-05-15T14:43:54+0000", "progress": 0, "config": { "editorSetting": { @@ -237,19 +238,19 @@ "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1662045098333_1096300336", + "jobName": "paragraph_1682508842873_871872279", "id": "paragraph_1662045098333_1096300336", - "dateCreated": "2022-09-01T15:11:38+0000", - "dateStarted": "2022-09-01T15:35:05+0000", - "dateFinished": "2022-09-01T15:35:05+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T14:43:54+0000", + "dateFinished": "2023-05-15T14:43:54+0000", "status": "FINISHED", "$$hashKey": "object:200" }, { "title": "Set up the template and selection for the trawl", - "text": "%pyspark\n\n# select a high s/n template for the search, e.g. Proxima Cen\nsid = 5853498713190525696\n\n# defined the template data frame\ntemplate_df = spark.sql('SELECT xp.*, g.phot_g_mean_mag FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id = xp.source_id WHERE g.source_id = %d'%(sid))\n\n# define a query over the entire dataset, restricting to low reddening for simplicity\nquery = 'SELECT xp.*, g.phot_g_mean_mag ' + \\\n 'FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id = xp.source_id ' + \\\n 'WHERE g.ag_gspphot < 0.1 AND MOD(g.random_index, 20) = 0'\n# TEST: give the template only in the df\n#query = 'SELECT * FROM xp_continuous_mean_spectrum WHERE source_id = %d'%(sid)\n\n# sanity check the formatted query\n#print(query)\n\n# define a data frame via the query\ndf = spark.sql(query)\n\n# get any that are similar and show them\nsimilar_df = find_similar_continuous_spectra(df, template_df)\n\n# results quick-look\n#similar_df.show()\n", + "text": "%pyspark\n\n# select a high s/n template for the search, e.g. Proxima Cen\nsid = 5853498713190525696\n\n# defined the template data frame\ntemplate_df = spark.sql('SELECT xp.*, g.phot_g_mean_mag FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id = xp.source_id WHERE g.source_id = %d'%(sid))\n\n# XP filter: Riello et al. (2020) for EDR3 defined by Equation 6 and coefficients in Table 2 and Equation 18:\nphotometric_consistency_filter = ' AND (' + \\\n '(bp_rp < 0.5 AND ABS(phot_bp_rp_excess_factor - (1.15436 + 0.033772*g.bp_rp + 0.032277*g.bp_rp*g.bp_rp)) < 2.0 * (0.0059898 + 8.817481e-12*POW(g.phot_g_mean_mag, 7.618399))) OR ' + \\\n '(bp_rp BETWEEN 0.5 AND 4.0 AND ABS(phot_bp_rp_excess_factor - (1.162004 + 0.011464*g.bp_rp + 0.049255*g.bp_rp*g.bp_rp -0.005879*g.bp_rp*g.bp_rp*g.bp_rp)) < 2.0 * (0.0059898 + 8.817481e-12*POW(g.phot_g_mean_mag, 7.618399))) OR ' + \\\n '(bp_rp >= 4.0 AND ABS(phot_bp_rp_excess_factor - (1.057572 + 0.0140537*g.bp_rp)) < 2.0 * (0.0059898 + 8.817481e-12*POW(g.phot_g_mean_mag, 7.618399))))'\n# N.B. this \"ultra-clean\" 2-sigma selection loses very faint red objects owing to the GBP photometry issue discussed in Riello et al. (2020), Section 8.1\n\n# define a query over the entire dataset, restricting to low reddening for simplicity and also filtering for a clean sample\nquery = 'SELECT xp.*, g.phot_g_mean_mag ' + \\\n 'FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id = xp.source_id ' + \\\n 'WHERE g.ag_gspphot < 0.1 ' + photometric_consistency_filter\n# TEST: give the template only in the df\n#query = 'SELECT * FROM xp_continuous_mean_spectrum WHERE source_id = %d'%(sid)\n\n# sanity check the formatted query\n#print(query)\n\n# define a data frame via the query\ndf = spark.sql(query)\n\n# get any that are similar and show them\nsimilar_df = find_similar_continuous_spectra(df, template_df)\n\n# results quick-look\n#similar_df.show()\n", "user": "NHambly", - "dateUpdated": "2022-09-02T08:53:16+0000", + "dateUpdated": "2023-05-15T14:43:54+0000", "progress": 0, "config": { "editorSetting": { @@ -261,7 +262,7 @@ "colWidth": 12, "editorMode": "ace/mode/python", "fontSize": 9, - "editorHide": false, + "editorHide": true, "title": true, "results": {}, "enabled": true @@ -283,19 +284,19 @@ "group": "spark", "values": [ { - "jobUrl": "http://zeppelin:4041/jobs/job?id=402", - "$$hashKey": "object:1168" + "jobUrl": "http://zeppelin:4040/jobs/job?id=1", + "$$hashKey": "object:817" } ], "interpreterSettingId": "spark" } }, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1662045989203_497219070", + "jobName": "paragraph_1682508842873_787473938", "id": "paragraph_1662045989203_497219070", - "dateCreated": "2022-09-01T15:26:29+0000", - "dateStarted": "2022-09-01T15:35:10+0000", - "dateFinished": "2022-09-01T15:35:15+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T14:43:54+0000", + "dateFinished": "2023-05-15T14:44:02+0000", "status": "FINISHED", "$$hashKey": "object:201" }, @@ -303,7 +304,7 @@ "title": "Action the trawl and collect the top 3 (for example) matches", "text": "%pyspark\n\n# collecting the results to a Pandas data frame collects the results to the driver interpreter process\n# so this cell will actually action the trawl on the Spark worker cluster. Be prepared to sit back and wait...\ntop3_pdf = similar_df.sort(similar_df.xp_similar.asc()).limit(3).toPandas()\n\n# sanity check as required\n# z.show(top3_pdf)", "user": "NHambly", - "dateUpdated": "2022-09-01T15:35:23+0000", + "dateUpdated": "2023-05-15T14:44:02+0000", "progress": 0, "config": { "editorSetting": { @@ -317,7 +318,8 @@ "fontSize": 9, "title": true, "results": {}, - "enabled": true + "enabled": true, + "editorHide": true }, "settings": { "params": {}, @@ -336,19 +338,19 @@ "group": "spark", "values": [ { - "jobUrl": "http://zeppelin:4041/jobs/job?id=403", - "$$hashKey": "object:907" + "jobUrl": "http://zeppelin:4040/jobs/job?id=2", + "$$hashKey": "object:863" } ], "interpreterSettingId": "spark" } }, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1662046085285_632981835", + "jobName": "paragraph_1682508842873_999109583", "id": "paragraph_1662046085285_632981835", - "dateCreated": "2022-09-01T15:28:05+0000", - "dateStarted": "2022-09-01T15:35:23+0000", - "dateFinished": "2022-09-01T16:27:34+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T14:44:02+0000", + "dateFinished": "2023-05-15T17:52:08+0000", "status": "FINISHED", "$$hashKey": "object:202" }, @@ -356,7 +358,7 @@ "title": "Plot sampled spectra (internal calibration)", "text": "%pyspark\n\n# convert to sampled form:\nsampled_spectra, sampling = convert(top3_pdf, save_file = False)\n \n# plot to sanity check:\nplot_spectra(sampled_spectra, sampling = sampling, multi=True, show_plot=True, output_path=None, legend=True)\n\n", "user": "NHambly", - "dateUpdated": "2022-09-01T18:04:37+0000", + "dateUpdated": "2023-05-15T17:52:08+0000", "progress": 0, "config": { "editorSetting": { @@ -370,7 +372,8 @@ "fontSize": 9, "title": true, "results": {}, - "enabled": true + "enabled": true, + "editorHide": true }, "settings": { "params": {}, @@ -385,18 +388,18 @@ }, { "type": "HTML", - "data": "
\n" + "data": "
\n" } ] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1662046203039_188861744", + "jobName": "paragraph_1682508842873_1151864136", "id": "paragraph_1662046203039_188861744", - "dateCreated": "2022-09-01T15:30:03+0000", - "dateStarted": "2022-09-01T18:04:37+0000", - "dateFinished": "2022-09-01T18:04:38+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T17:52:08+0000", + "dateFinished": "2023-05-15T17:52:09+0000", "status": "FINISHED", "$$hashKey": "object:203" }, @@ -404,7 +407,7 @@ "title": "Plot externally calibrated spectra", "text": "%pyspark\n\n# externally calibrate the spectra\ncalibrated_spectra, sampling = calibrate(top3_pdf, save_file = False)\n\n# plot the spectra\nplot_spectra(calibrated_spectra, sampling = sampling, legend = True)\n\n", "user": "NHambly", - "dateUpdated": "2022-09-01T18:04:42+0000", + "dateUpdated": "2023-05-15T17:52:09+0000", "progress": 0, "config": { "editorSetting": { @@ -418,7 +421,8 @@ "fontSize": 9, "title": true, "results": {}, - "enabled": true + "enabled": true, + "editorHide": true }, "settings": { "params": {}, @@ -433,18 +437,18 @@ }, { "type": "HTML", - "data": "
\n
\n
\n" + "data": "
\n
\n
\n" } ] }, "apps": [], "runtimeInfos": {}, "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1662046259646_282464556", + "jobName": "paragraph_1682508842874_1960860646", "id": "paragraph_1662046259646_282464556", - "dateCreated": "2022-09-01T15:30:59+0000", - "dateStarted": "2022-09-01T18:04:42+0000", - "dateFinished": "2022-09-01T18:04:43+0000", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T17:52:09+0000", + "dateFinished": "2023-05-15T17:52:09+0000", "status": "FINISHED", "$$hashKey": "object:204" }, @@ -452,7 +456,7 @@ "title": "Further information", "text": "%md\n\n* [Gaia DR3 XP spectra online documentation](https://gea.esac.esa.int/archive/documentation/GDR3/Data_processing/chap_cu5pho/cu5pho_sec_specProcessing/cu5pho_ssec_specInternCal.html#SSS3)\n* [Gaia DR3 spectroscopic data model](https://gea.esac.esa.int/archive/documentation/GDR3/Gaia_archive/chap_datamodel/sec_dm_spectroscopic_tables/)\n* [GaiaXPy website](https://gaia-dpci.github.io/GaiaXPy-website/)\n* [GaiaXPy API documentation](https://gaiaxpy.readthedocs.io/en/latest/gaiaxpy.html)\n* [Vectorised User Defined Functions for PySpark](https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html)\n", "user": "NHambly", - "dateUpdated": "2022-09-02T08:54:11+0000", + "dateUpdated": "2023-05-15T17:52:09+0000", "progress": 0, "config": { "tableHide": false, @@ -474,16 +478,6 @@ "params": {}, "forms": {} }, - "apps": [], - "runtimeInfos": {}, - "progressUpdateIntervalMs": 500, - "jobName": "paragraph_1651053931648_1906164321", - "id": "paragraph_1651053931648_1906164321", - "dateCreated": "2022-04-27T10:05:31+0000", - "dateStarted": "2022-09-02T08:54:11+0000", - "dateFinished": "2022-09-02T08:54:11+0000", - "status": "FINISHED", - "$$hashKey": "object:205", "results": { "code": "SUCCESS", "msg": [ @@ -492,13 +486,23 @@ "data": "" } ] - } + }, + "apps": [], + "runtimeInfos": {}, + "progressUpdateIntervalMs": 500, + "jobName": "paragraph_1682508842875_567374071", + "id": "paragraph_1651053931648_1906164321", + "dateCreated": "2023-04-26T11:34:02+0000", + "dateStarted": "2023-05-15T17:52:09+0000", + "dateFinished": "2023-05-15T17:52:09+0000", + "status": "FINISHED", + "$$hashKey": "object:205" } ], "name": "5. Working with Gaia XP spectra", - "id": "2H2YRJCKM", + "id": "2J16SYYU3", "defaultInterpreterGroup": "spark", - "version": "0.10.0", + "version": "0.10.1-gaia-dmp-0.1", "noteParams": {}, "noteForms": {}, "angularObjects": {}, @@ -507,6 +511,8 @@ "looknfeel": "default", "personalizedMode": "false" }, - "info": {}, - "path": "/Public Examples/5. Working with Gaia XP spectra" + "info": { + "isRunning": false + }, + "path": "/Users/NHambly/examples/5. Working with Gaia XP spectra" } \ No newline at end of file diff --git a/Public Examples/ipynb/5. Working with Gaia XP spectra..ipynb b/Public Examples/ipynb/5. Working with Gaia XP spectra..ipynb index 38cbe4c..66d3063 100644 --- a/Public Examples/ipynb/5. Working with Gaia XP spectra..ipynb +++ b/Public Examples/ipynb/5. Working with Gaia XP spectra..ipynb @@ -51,7 +51,7 @@ "autoscroll": "auto" }, "outputs": [], - "source": "%pyspark\n\nimport numpy as np\nimport pandas as pd\nfrom pyspark.sql.functions import pandas_udf\nfrom pyspark.sql import DataFrame\n#from scipy.stats import chi\n\n# static constants for use when reconstructing a covariance matrix from the flattened upper-triangular correlation matrix stored in xp_continuous_mean_spectrum\nNUM_XP_COEFFS \u003d 55\n# these indexes define the column-major positions of the correlation vector elements in the lower triangle of the 2d correlation matrix ...\nlower_index \u003d np.tril_indices(NUM_XP_COEFFS,-1)\n# ... note that numpy indexing is row-major. The upper triangular index is created from the lower, reflecting across the diagonal,\n# by transposing the axes - this results in the required column-major indexing for the upper part\nupper_index \u003d (lower_index[1], lower_index[0])\n# correlation matrix, empty apart from unity on the diagonal (off-diagonal elements to be filled in on a case-by-case basis)\ncorrelation_matrix \u003d np.diag(np.ones(NUM_XP_COEFFS))\n\ndef make_correlation_matrix(correlation_vector : np.ndarray) -\u003e np.ndarray:\n \u0027\u0027\u0027\n Returns the fully populated 2d correlation matrix given the flattened, 1d upper-triangular off-diagonal\n elements of the same as persisted in the table of XP continuous representation spectra.\n \u0027\u0027\u0027\n # copy in unique, off-diagonal elements from the flattened correlation vector into the 2d-indexed positions\n correlation_matrix[upper_index] \u003d correlation_vector\n correlation_matrix[lower_index] \u003d correlation_vector\n \n # give back the complete correlation matrix\n return correlation_matrix\n\ndef make_covariance_matrix(complete_correlation_matrix : np.ndarray, coefficient_error_vector : np.ndarray) -\u003e np.ndarray:\n \u0027\u0027\u0027\n Creates the fully reconstructed 2d covariance matrix of an XP continuous representation spectrum given the\n complete 2d correlation matrix and the vector of formal errors on the coefficients. Note that Gaia DPAC CU5 scale \n predicted coefficient errors by the standard deviation as a post-hoc correction to the formal (sqrt) variances.\n GaiaXPy actually reverses this scaling in computation of the covariance matrix to return the latter as exactly\n that produced as a result of the least-squares solution for the coefficients. Such a de-scaling of the errors\n is not applied here: we assume that the uncertainties are best represented with this scaling intact.\n \u0027\u0027\u0027\n # 2d matrix with the errors on the diagonal\n error_matrix \u003d np.diag(coefficient_error_vector)\n \n # from the standard relationship between covariance and correlation\n return error_matrix @ (error_matrix @ complete_correlation_matrix)\n \ndef xp_mahalanobis_distance(coeff_vector_1 : np.ndarray, covariance_1 : np.ndarray,\n coeff_vector_2 : np.ndarray, error_vector_2 : np.ndarray, correlation_vector_2 : np.ndarray) -\u003e float:\n \u0027\u0027\u0027\n Computes the Mahalanobis distance between two XP spectra given template coefficients and fully populated\n covariance for the first, and the data record of the second candidate spectrum, i.e. coefficients, errors and \n upper-triangular part of the correlation matrix in the continuous representation. The second set of\n coefficients and errors should be scaled to the same flux level as the template spectrum. This means that\n the distance returned quantifies how close the candidate SED shape is to that of the template regardless any\n difference in intrinsic luminosity.\n \n This function uses plain matrix inversion of the combined covariance matrix. This can be numerically unstable\n and result in a negative squared distance resulting in turn in a return value of not-a-number. It is up to \n the calling application to handle this condition.\n \u0027\u0027\u0027\n # second covariance matrix\n corr2 \u003d make_correlation_matrix(correlation_vector_2)\n covariance_2 \u003d make_covariance_matrix(corr2, error_vector_2)\n \n # form covariance of the coefficient difference vector as sum of individual covariance \n cocovar \u003d covariance_1 + covariance_2\n \n # inverse of combined, scaled covariance\n cocovar_inv \u003d np.linalg.inv(cocovar)\n\n # use these in the computation of the square of the Mahalanobis distance, e.g. see source linked from\n # https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.mahalanobis.html\n delta \u003d coeff_vector_1 - coeff_vector_2\n d \u003d np.sqrt(np.dot(np.dot(delta, cocovar_inv), delta))\n \n # resulting Mahalanobs distance follows chi distribution with NUM_XP_COEFFS degrees of freedom (M. Weiler, personal communication)\n # for identical spectra\n return d\n \ndef find_similar_continuous_spectra(data_frame : DataFrame, template_df : DataFrame) -\u003e DataFrame:\n \u0027\u0027\u0027\n Given data frames defining a large set of XP spectra in continuous representation, \n and a single template example also in continuous representation, search the former for cases\n similar to the latter. The data frame of the set of spectra being\n searched is annotated with a dissimilarity statistic: the greater the value the more\n dissimilar is the candidate spectrum to the template given. By definition this statistic\n will be zero for the template spectrum if it is present in the set of candidates.\n \n Parameters:\n -----------\n data_frame : DataFrame()\n the data frame encapsulating the set of XP continuous representation spectra to be searched\n template_df : DataFrame()\n the template, also in XP continuous representation encapsulated in a data frame.\n \n return : DataFrame()\n a new data frame annotated with a dissimilarity (increasingly positive) statistic where \n zero indicates a perfect match.\n \u0027\u0027\u0027\n \n # convenience reference to template as a Row object:\n template_row \u003d template_df.collect()[0]\n \n # extract the template arrays \n template_bp_coefficients \u003d np.array(template_row[\u0027bp_coefficients\u0027]).reshape(-1)\n template_bp_coefficient_errors \u003d np.array(template_row[\u0027bp_coefficient_errors\u0027]).reshape(-1)\n template_bp_correlations \u003d np.array(template_row[\u0027bp_coefficient_correlations\u0027]).reshape(-1)\n template_rp_coefficients \u003d np.array(template_row[\u0027rp_coefficients\u0027]).reshape(-1)\n template_rp_coefficient_errors \u003d np.array(template_row[\u0027rp_coefficient_errors\u0027]).reshape(-1)\n template_rp_correlations \u003d np.array(template_row[\u0027rp_coefficient_correlations\u0027]).reshape(-1)\n template_gmag \u003d template_row[\u0027phot_g_mean_mag\u0027]\n\n # precompute the required vectors and matrices for the template\n bp_correl_mat \u003d make_correlation_matrix(template_bp_correlations)\n template_bp_covariance \u003d make_covariance_matrix(bp_correl_mat, template_bp_coefficient_errors)\n rp_correl_mat \u003d make_correlation_matrix(template_rp_correlations)\n template_rp_covariance \u003d make_covariance_matrix(rp_correl_mat, template_rp_coefficient_errors)\n \n # define a vectorised Pandas UDF against the template for comparison of other spectra against it\n @pandas_udf(\u0027float\u0027)\n def xp_is_similar(bp_coeffs:pd.Series, bp_coeff_errors:pd.Series, bp_correlations:pd.Series, \n rp_coeffs:pd.Series, rp_coeff_errors:pd.Series, rp_correlations:pd.Series,\n gmags:pd.Series) -\u003e pd.Series:\n \u0027\u0027\u0027\n Create a similarity metric for the XP continuous representation coefficients with respect \n to those of a predefined template. Similarity is based on a combined BP and RP Mahalanobis\n distance between the coefficient sets with full accounting for covariance. Input arguments\n are series of the BP and RP coefficients, errors and correlation vectors as stored in table\n xp_continuous_mean_spectrum. The returned object is a corresponding series of floats of the\n quadrature sum of the BP and RP Mahalanobis distances between each candidate spectrum and\n the static template defined above.\n \u0027\u0027\u0027\n \n # initialise the results series\n results \u003d pd.Series(np.full(bp_coeffs.size, 0.0))\n\n # iterate over the data series\n for i in range(bp_coeffs.size):\n \n # normalise the candidate flux coefficients and uncertainties to that of the template\n norm \u003d 10.0**(0.4 * (template_gmag - gmags.iloc[i]))\n bp_coeffs_norm \u003d bp_coeffs.iloc[i] / norm\n bp_coeff_errors_norm \u003d bp_coeff_errors.iloc[i] / norm\n rp_coeffs_norm \u003d rp_coeffs.iloc[i] / norm\n rp_coeff_errors_norm \u003d rp_coeff_errors.iloc[i] / norm\n \n # Mahalanobis distances for the individual BP and RP coefficient sets normalised to the flux level of the template\n bp_mdist \u003d xp_mahalanobis_distance(template_bp_coefficients, template_bp_covariance, bp_coeffs_norm, bp_coeff_errors_norm, bp_correlations.iloc[i])\n rp_mdist \u003d xp_mahalanobis_distance(template_rp_coefficients, template_rp_covariance, rp_coeffs_norm, rp_coeff_errors_norm, rp_correlations.iloc[i])\n \n # combined Mahalanobis distance\n results.iloc[i] \u003d np.sqrt(bp_mdist * bp_mdist + rp_mdist * rp_mdist)\n \n return results\n\n # add in the similarity statistic\n data_frame \u003d data_frame.withColumn(\u0027xp_similar\u0027, xp_is_similar(\n data_frame.bp_coefficients, data_frame.bp_coefficient_errors, data_frame.bp_coefficient_correlations, \n data_frame.rp_coefficients, data_frame.rp_coefficient_errors, data_frame.rp_coefficient_correlations, data_frame.phot_g_mean_mag))\n \n # give back the full set filtering out any nulls (which may result from NaN individual Mahalanobis distances)\n return data_frame.filter(data_frame.xp_similar.isNotNull())\n\n" + "source": "%pyspark\n\nimport numpy as np\nimport pandas as pd\nfrom pyspark.sql.functions import pandas_udf\nfrom pyspark.sql import DataFrame\n#from scipy.stats import chi\n\n# static constants for use when reconstructing a covariance matrix from the flattened upper-triangular correlation matrix stored in xp_continuous_mean_spectrum\nNUM_XP_COEFFS \u003d 55\n# these indexes define the column-major positions of the correlation vector elements in the lower triangle of the 2d correlation matrix ...\nlower_index \u003d np.tril_indices(NUM_XP_COEFFS,-1)\n# ... note that numpy indexing is row-major. The upper triangular index is created from the lower, reflecting across the diagonal,\n# by transposing the axes - this results in the required column-major indexing for the upper part\nupper_index \u003d (lower_index[1], lower_index[0])\n# correlation matrix, empty apart from unity on the diagonal (off-diagonal elements to be filled in on a case-by-case basis)\ncorrelation_matrix \u003d np.diag(np.ones(NUM_XP_COEFFS))\n\ndef make_correlation_matrix(correlation_vector : np.ndarray) -\u003e np.ndarray:\n \u0027\u0027\u0027\n Returns the fully populated 2d correlation matrix given the flattened, 1d upper-triangular off-diagonal\n elements of the same as persisted in the table of XP continuous representation spectra.\n \u0027\u0027\u0027\n # copy in unique, off-diagonal elements from the flattened correlation vector into the 2d-indexed positions\n correlation_matrix[upper_index] \u003d correlation_vector\n correlation_matrix[lower_index] \u003d correlation_vector\n \n # give back the complete correlation matrix\n return correlation_matrix\n\ndef make_covariance_matrix(complete_correlation_matrix : np.ndarray, coefficient_error_vector : np.ndarray) -\u003e np.ndarray:\n \u0027\u0027\u0027\n Creates the fully reconstructed 2d covariance matrix of an XP continuous representation spectrum given the\n complete 2d correlation matrix and the vector of formal errors on the coefficients. Note that Gaia DPAC CU5 scale \n predicted coefficient errors by the standard deviation as a post-hoc correction to the formal (sqrt) variances.\n GaiaXPy actually reverses this scaling in computation of the covariance matrix to return the latter as exactly\n that produced as a result of the least-squares solution for the coefficients. Such a de-scaling of the errors\n is not applied here: we assume that the uncertainties are best represented with this scaling intact.\n \u0027\u0027\u0027\n # 2d matrix with the errors on the diagonal\n error_matrix \u003d np.diag(coefficient_error_vector)\n \n # from the standard relationship between covariance and correlation\n return error_matrix @ (error_matrix @ complete_correlation_matrix)\n \ndef xp_mahalanobis_distance(coeff_vector_1 : np.ndarray, covariance_1 : np.ndarray,\n coeff_vector_2 : np.ndarray, error_vector_2 : np.ndarray, correlation_vector_2 : np.ndarray) -\u003e float:\n \u0027\u0027\u0027\n Computes the Mahalanobis distance between two XP spectra given template coefficients and fully populated\n covariance for the first, and the data record of the second candidate spectrum, i.e. coefficients, errors and \n upper-triangular part of the correlation matrix in the continuous representation. The second set of\n coefficients and errors should be scaled to the same flux level as the template spectrum. This means that\n the distance returned quantifies how close the candidate SED shape is to that of the template regardless any\n difference in intrinsic luminosity.\n \n This function uses plain matrix inversion of the combined covariance matrix. This can be numerically unstable\n and result in a negative squared distance resulting in turn in a return value of not-a-number. It is up to \n the calling application to handle this condition.\n \u0027\u0027\u0027\n # second covariance matrix\n corr2 \u003d make_correlation_matrix(correlation_vector_2)\n covariance_2 \u003d make_covariance_matrix(corr2, error_vector_2)\n \n # form covariance of the coefficient difference vector as sum of individual covariance \n cocovar \u003d covariance_1 + covariance_2\n \n # inverse of combined, scaled covariance\n cocovar_inv \u003d np.linalg.inv(cocovar)\n\n # use these in the computation of the square of the Mahalanobis distance, e.g. see source linked from\n # https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.mahalanobis.html\n delta \u003d coeff_vector_1 - coeff_vector_2\n d \u003d np.sqrt(np.dot(np.dot(delta, cocovar_inv), delta))\n \n # resulting Mahalanobs distance follows chi distribution with NUM_XP_COEFFS degrees of freedom (M. Weiler, personal communication)\n # for identical spectra\n return d\n \ndef xp_signal_to_noise(coeff : np.ndarray, coeff_err : np.ndarray, n_valid : int) -\u003e float:\n \u0027\u0027\u0027\n Computes global signal-to-noise ratio of XP spectra via L2 norms - see De Angeli et al. (2022) Section 5.3.\n Assume this should done using only the N relevenat bases.\n \n Returns: signal-to-noise ratio for the individual band spectrum coefficients and uncertainties\n \u0027\u0027\u0027\n \n # not sure why is it necessary to type check this but the trawl barfs near the end if this catch is not present (!)\n if not type(n_valid) is int: return 0.0\n \n # signal-to-noise from L2 norm of the XP coefficients\n return np.linalg.norm(coeff[:n_valid]) / np.linalg.norm(coeff_err[:n_valid])\n\n@pandas_udf(\u0027float\u0027)\ndef signal_to_noise_pudf(coeffs : pd.Series, coeff_errors : pd.Series, nums_valid : pd.Series) -\u003e pd.Series:\n \u0027\u0027\u0027\n Compute a vector of global signal-to-noise estimates for an individual XP band given the basis\n coefficients, their uncertainties, and the number of valid components.\n \u0027\u0027\u0027\n \n # simply vectorize by combining series into a single Pandas dataframe object on which the apply method can be invoked\n return pd.concat([coeffs, coeff_errors, nums_valid], axis \u003d 1).apply(lambda row: xp_signal_to_noise(row[0], row[1], row[2]), axis \u003d 1)\n \ndef find_similar_continuous_spectra(data_frame : DataFrame, template_df : DataFrame) -\u003e DataFrame:\n \u0027\u0027\u0027\n Given data frames defining a large set of XP spectra in continuous representation, \n and a single template example also in continuous representation, search the former for cases\n similar to the latter. The data frame of the set of spectra being\n searched is annotated with a dissimilarity statistic: the greater the value the more\n dissimilar is the candidate spectrum to the template given. By definition this statistic\n will be zero for the template spectrum if it is present in the set of candidates.\n \n Parameters:\n -----------\n data_frame : DataFrame()\n the data frame encapsulating the set of XP continuous representation spectra to be searched\n template_df : DataFrame()\n the template, also in XP continuous representation encapsulated in a data frame.\n \n return : DataFrame()\n a new data frame annotated with a dissimilarity (increasingly positive) statistic where \n zero indicates a perfect match.\n \u0027\u0027\u0027\n \n # convenience reference to template as a Row object:\n template_row \u003d template_df.collect()[0]\n \n # extract the template arrays \n template_bp_coefficients \u003d np.array(template_row[\u0027bp_coefficients\u0027]).reshape(-1)\n template_bp_coefficient_errors \u003d np.array(template_row[\u0027bp_coefficient_errors\u0027]).reshape(-1)\n template_bp_correlations \u003d np.array(template_row[\u0027bp_coefficient_correlations\u0027]).reshape(-1)\n template_rp_coefficients \u003d np.array(template_row[\u0027rp_coefficients\u0027]).reshape(-1)\n template_rp_coefficient_errors \u003d np.array(template_row[\u0027rp_coefficient_errors\u0027]).reshape(-1)\n template_rp_correlations \u003d np.array(template_row[\u0027rp_coefficient_correlations\u0027]).reshape(-1)\n template_gmag \u003d template_row[\u0027phot_g_mean_mag\u0027]\n\n # precompute the required vectors and matrices for the template\n bp_correl_mat \u003d make_correlation_matrix(template_bp_correlations)\n template_bp_covariance \u003d make_covariance_matrix(bp_correl_mat, template_bp_coefficient_errors)\n rp_correl_mat \u003d make_correlation_matrix(template_rp_correlations)\n template_rp_covariance \u003d make_covariance_matrix(rp_correl_mat, template_rp_coefficient_errors)\n \n # define a vectorised Pandas UDF against the template for comparison of other spectra against it\n @pandas_udf(\u0027float\u0027)\n def xp_is_similar(bp_coeffs:pd.Series, bp_coeff_errors:pd.Series, bp_correlations:pd.Series, \n rp_coeffs:pd.Series, rp_coeff_errors:pd.Series, rp_correlations:pd.Series,\n gmags:pd.Series) -\u003e pd.Series:\n \u0027\u0027\u0027\n Create a similarity metric for the XP continuous representation coefficients with respect \n to those of a predefined template. Similarity is based on a combined BP and RP Mahalanobis\n distance between the coefficient sets with full accounting for covariance. Input arguments\n are series of the BP and RP coefficients, errors and correlation vectors as stored in table\n xp_continuous_mean_spectrum. The returned object is a corresponding series of floats of the\n quadrature sum of the BP and RP Mahalanobis distances between each candidate spectrum and\n the static template defined above.\n \u0027\u0027\u0027\n \n # initialise the results series\n results \u003d pd.Series(np.full(bp_coeffs.size, 0.0))\n\n # iterate over the data series\n for i in range(bp_coeffs.size):\n \n # normalise the candidate flux coefficients and uncertainties to that of the template\n norm \u003d 10.0**(0.4 * (template_gmag - gmags.iloc[i]))\n bp_coeffs_norm \u003d bp_coeffs.iloc[i] / norm\n bp_coeff_errors_norm \u003d bp_coeff_errors.iloc[i] / norm\n rp_coeffs_norm \u003d rp_coeffs.iloc[i] / norm\n rp_coeff_errors_norm \u003d rp_coeff_errors.iloc[i] / norm\n \n # Mahalanobis distances for the individual BP and RP coefficient sets normalised to the flux level of the template\n bp_mdist \u003d xp_mahalanobis_distance(template_bp_coefficients, template_bp_covariance, bp_coeffs_norm, bp_coeff_errors_norm, bp_correlations.iloc[i])\n rp_mdist \u003d xp_mahalanobis_distance(template_rp_coefficients, template_rp_covariance, rp_coeffs_norm, rp_coeff_errors_norm, rp_correlations.iloc[i])\n \n # combined Mahalanobis distance\n results.iloc[i] \u003d np.sqrt(bp_mdist * bp_mdist + rp_mdist * rp_mdist)\n \n return results\n\n # filter the given data such that only those sources with S/N ratio of 90% or better are tested for similarity for best reliability\n # N.B. completeness is totally neglected here in this illustrative workflow!\n template_bp_n_rel_bases \u003d template_row[\u0027bp_n_relevant_bases\u0027]\n template_rp_n_rel_bases \u003d template_row[\u0027rp_n_relevant_bases\u0027]\n threshold_bp_snr \u003d 0.9 * xp_signal_to_noise(template_bp_coefficients, template_bp_coefficient_errors, template_bp_n_rel_bases)\n threshold_rp_snr \u003d 0.9 * xp_signal_to_noise(template_rp_coefficients, template_rp_coefficient_errors, template_rp_n_rel_bases)\n data_frame \u003d data_frame.filter(signal_to_noise_pudf(\u0027bp_coefficients\u0027, \u0027bp_coefficient_errors\u0027, \u0027bp_n_relevant_bases\u0027) \u003e threshold_bp_snr)\n data_frame \u003d data_frame.filter(signal_to_noise_pudf(\u0027rp_coefficients\u0027, \u0027rp_coefficient_errors\u0027, \u0027rp_n_relevant_bases\u0027) \u003e threshold_rp_snr)\n \n # add in the similarity statistic\n data_frame \u003d data_frame.withColumn(\u0027xp_similar\u0027, xp_is_similar(\n data_frame.bp_coefficients, data_frame.bp_coefficient_errors, data_frame.bp_coefficient_correlations, \n data_frame.rp_coefficients, data_frame.rp_coefficient_errors, data_frame.rp_coefficient_correlations, data_frame.phot_g_mean_mag))\n \n # give back the full set filtering out any nulls (which may result from NaN individual Mahalanobis distances)\n return data_frame.filter(data_frame.xp_similar.isNotNull())\n\n" }, { "cell_type": "code", @@ -60,7 +60,7 @@ "autoscroll": "auto" }, "outputs": [], - "source": "%pyspark\n\n# select a high s/n template for the search, e.g. Proxima Cen\nsid \u003d 5853498713190525696\n\n# defined the template data frame\ntemplate_df \u003d spark.sql(\u0027SELECT xp.*, g.phot_g_mean_mag FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id \u003d xp.source_id WHERE g.source_id \u003d %d\u0027%(sid))\n\n# define a query over the entire dataset, restricting to low reddening for simplicity\nquery \u003d \u0027SELECT xp.*, g.phot_g_mean_mag \u0027 + \\\n \u0027FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id \u003d xp.source_id \u0027 + \\\n \u0027WHERE g.ag_gspphot \u003c 0.1 AND MOD(g.random_index, 20) \u003d 0\u0027\n# TEST: give the template only in the df\n#query \u003d \u0027SELECT * FROM xp_continuous_mean_spectrum WHERE source_id \u003d %d\u0027%(sid)\n\n# sanity check the formatted query\n#print(query)\n\n# define a data frame via the query\ndf \u003d spark.sql(query)\n\n# get any that are similar and show them\nsimilar_df \u003d find_similar_continuous_spectra(df, template_df)\n\n# results quick-look\n#similar_df.show()\n" + "source": "%pyspark\n\n# select a high s/n template for the search, e.g. Proxima Cen\nsid \u003d 5853498713190525696\n\n# defined the template data frame\ntemplate_df \u003d spark.sql(\u0027SELECT xp.*, g.phot_g_mean_mag FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id \u003d xp.source_id WHERE g.source_id \u003d %d\u0027%(sid))\n\n# XP filter: Riello et al. (2020) for EDR3 defined by Equation 6 and coefficients in Table 2 and Equation 18:\nphotometric_consistency_filter \u003d \u0027 AND (\u0027 + \\\n \u0027(bp_rp \u003c 0.5 AND ABS(phot_bp_rp_excess_factor - (1.15436 + 0.033772*g.bp_rp + 0.032277*g.bp_rp*g.bp_rp)) \u003c 2.0 * (0.0059898 + 8.817481e-12*POW(g.phot_g_mean_mag, 7.618399))) OR \u0027 + \\\n \u0027(bp_rp BETWEEN 0.5 AND 4.0 AND ABS(phot_bp_rp_excess_factor - (1.162004 + 0.011464*g.bp_rp + 0.049255*g.bp_rp*g.bp_rp -0.005879*g.bp_rp*g.bp_rp*g.bp_rp)) \u003c 2.0 * (0.0059898 + 8.817481e-12*POW(g.phot_g_mean_mag, 7.618399))) OR \u0027 + \\\n \u0027(bp_rp \u003e\u003d 4.0 AND ABS(phot_bp_rp_excess_factor - (1.057572 + 0.0140537*g.bp_rp)) \u003c 2.0 * (0.0059898 + 8.817481e-12*POW(g.phot_g_mean_mag, 7.618399))))\u0027\n# N.B. this \"ultra-clean\" 2-sigma selection loses very faint red objects owing to the GBP photometry issue discussed in Riello et al. (2020), Section 8.1\n\n# define a query over the entire dataset, restricting to low reddening for simplicity and also filtering for a clean sample\nquery \u003d \u0027SELECT xp.*, g.phot_g_mean_mag \u0027 + \\\n \u0027FROM xp_continuous_mean_spectrum AS xp INNER JOIN gaia_source AS g ON g.source_id \u003d xp.source_id \u0027 + \\\n \u0027WHERE g.ag_gspphot \u003c 0.1 \u0027 + photometric_consistency_filter\n# TEST: give the template only in the df\n#query \u003d \u0027SELECT * FROM xp_continuous_mean_spectrum WHERE source_id \u003d %d\u0027%(sid)\n\n# sanity check the formatted query\n#print(query)\n\n# define a data frame via the query\ndf \u003d spark.sql(query)\n\n# get any that are similar and show them\nsimilar_df \u003d find_similar_continuous_spectra(df, template_df)\n\n# results quick-look\n#similar_df.show()\n" }, { "cell_type": "code",