From 29f091b22798b047193632d14ae01bfdce18806d Mon Sep 17 00:00:00 2001 From: smlmbrt Date: Tue, 7 Nov 2023 16:22:46 +0000 Subject: [PATCH 01/13] More explicit header expectations (might solve empty row problems) Signed-off-by: smlmbrt --- pgscatalog_utils/ancestry/read.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pgscatalog_utils/ancestry/read.py b/pgscatalog_utils/ancestry/read.py index 79b1b84..cfb0ed3 100644 --- a/pgscatalog_utils/ancestry/read.py +++ b/pgscatalog_utils/ancestry/read.py @@ -18,7 +18,7 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): for i, path in enumerate(loc_pcs): logger.debug("Reading PCA projection: {}".format(path)) - df = pd.read_csv(path, sep='\t') + df = pd.read_csv(path, sep='\t', header=0) df['sampleset'] = dataset df.set_index(['sampleset', 'IID'], inplace=True) @@ -52,7 +52,7 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): def extract_ref_psam_cols(loc_psam, dataset: str, df_target, keepcols=['SuperPop', 'Population']): - psam = pd.read_csv(loc_psam, sep='\t') + psam = pd.read_csv(loc_psam, sep='\t', header=0) match (psam.columns[0]): # handle case of #IID -> IID (happens when #FID is present) @@ -76,7 +76,7 @@ def read_pgs(loc_aggscore, onlySUM: bool): :return: """ logger.debug('Reading aggregated score data: {}'.format(loc_aggscore)) - df = pd.read_csv(loc_aggscore, sep='\t', index_col=['sampleset', 'IID']) + df = pd.read_csv(loc_aggscore, sep='\t', header=0, index_col=['sampleset', 'IID']) if onlySUM: df = df[[x for x in df.columns if x.endswith('_SUM')]] rn = [x.rstrip('_SUM') for x in df.columns] From d5d8e3051cbfe161dd066e8aaa01c64550fa344b Mon Sep 17 00:00:00 2001 From: smlmbrt Date: Tue, 7 Nov 2023 17:03:55 +0000 Subject: [PATCH 02/13] Ensure sample overlap between datasets (that there should be no data loss) Signed-off-by: smlmbrt --- pgscatalog_utils/ancestry/ancestry_analysis.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pgscatalog_utils/ancestry/ancestry_analysis.py b/pgscatalog_utils/ancestry/ancestry_analysis.py index 39a2461..2371c91 100644 --- a/pgscatalog_utils/ancestry/ancestry_analysis.py +++ b/pgscatalog_utils/ancestry/ancestry_analysis.py @@ -26,14 +26,25 @@ def ancestry_analysis(): loc_related_ids=args.ref_related, nPCs=maxPCs) loc_ref_psam = args.psam reference_df = extract_ref_psam_cols(loc_ref_psam, args.d_ref, reference_df, keepcols=[args.ref_label]) + assert reference_df.shape[0] > 100, "Error: too few reference panel samples. This is an arbitrary threshold " \ + "for input QC; however, it is inadvisable to run this analysis with limited " \ + "reference panel diversity as empirical percentiles are calculated." loc_target_sscores = args.target_pcs target_df = read_pcs(loc_pcs=loc_target_sscores, dataset=args.d_target, nPCs=maxPCs) + assert target_df.shape[0] >= 1, "Error: NO target samples found in PCs file." # Load PGS data & merge with PCA data pgs = read_pgs(args.scorefile, onlySUM=True) scorecols = list(pgs.columns) + + ## There should be perfect target sample overlap + assert all([x in pgs.loc['reference'].index for x in reference_df.index.get_level_values(1)]), \ + "Error: PGS data missing for reference samples with PCA data." reference_df = pd.merge(reference_df, pgs, left_index=True, right_index=True) + + assert all([x in pgs.loc[args.d_target].index for x in target_df.index.get_level_values(1)]), \ + "Error: PGS data missing for reference samples with PCA data." target_df = pd.merge(target_df, pgs, left_index=True, right_index=True) del pgs # clear raw PGS from memory From 2597a49442f45e4364ec27b1a9d91b5171920f51 Mon Sep 17 00:00:00 2001 From: smlmbrt Date: Tue, 7 Nov 2023 17:18:58 +0000 Subject: [PATCH 03/13] Changes: - Check probability of the sample coming from any population as a proxy for outlier status via PCs - Only run the PC-by-PC stratification check if there are over 20 target samples. These checks will be false positives if the samples are from a population that isn't in the reference. Signed-off-by: smlmbrt --- pgscatalog_utils/ancestry/tools.py | 54 ++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/pgscatalog_utils/ancestry/tools.py b/pgscatalog_utils/ancestry/tools.py index 50869e1..47cffaa 100644 --- a/pgscatalog_utils/ancestry/tools.py +++ b/pgscatalog_utils/ancestry/tools.py @@ -35,8 +35,20 @@ def choose_pval_threshold(args): return set_threshold -def compare_ancestry(ref_df: pd.DataFrame, ref_pop_col: str, target_df: pd.DataFrame, ref_train_col=None, n_pcs=4, method='RandomForest', - covariance_method='EmpiricalCovariance', p_threshold=None): +def get_covariance_method(method_name): + match method_name: + case 'MinCovDet': + covariance_model = MinCovDet() + case 'EmpiricalCovariance': + covariance_model = EmpiricalCovariance() + case _: + assert False, "Invalid covariance method" + + return covariance_model + + +def compare_ancestry(ref_df: pd.DataFrame, ref_pop_col: str, target_df: pd.DataFrame, ref_train_col=None, n_pcs=4, + method='RandomForest', covariance_method='EmpiricalCovariance', p_threshold=None): """ Function to compare target sample ancestry to a reference panel with PCA data :param ref_df: reference dataset @@ -52,7 +64,7 @@ def compare_ancestry(ref_df: pd.DataFrame, ref_pop_col: str, target_df: pd.DataF # Check that datasets have the correct columns assert method in comparison_method_threshold.keys(), 'comparison method parameter must be Mahalanobis or RF' if method == 'Mahalanobis': - assert covariance_method in _mahalanobis_methods, 'ovariance estimation method must be MinCovDet or EmpiricalCovariance' + assert covariance_method in _mahalanobis_methods, 'covariance estimation method must be MinCovDet or EmpiricalCovariance' cols_pcs = ['PC{}'.format(x + 1) for x in range(0, n_pcs)] assert all([col in ref_df.columns for col in cols_pcs]), \ @@ -73,14 +85,27 @@ def compare_ancestry(ref_df: pd.DataFrame, ref_pop_col: str, target_df: pd.DataF else: ref_train_df = ref_df - # Check if PCs only capture target/reference stratification + # Check outlier-ness of target with regard to the reference PCA space compare_info = {} - for col_pc in cols_pcs: - mwu_pc = mannwhitneyu(ref_train_df[col_pc], target_df[col_pc]) - compare_info[col_pc] = {'U': mwu_pc.statistic, 'pvalue': mwu_pc.pvalue} - if mwu_pc.pvalue < 1e-4: - logger.warning("{} *may* be capturing target/reference stratification (Mann-Whitney p-value={}), " - "use visual inspection of PC plot to confirm".format(col_pc, mwu_pc.pvalue)) + pop = 'ALL' + ref_covariance_model = get_covariance_method(covariance_method) + ref_covariance_fit = ref_covariance_model.fit(ref_train_df[cols_pcs]) + colname_dist = 'Mahalanobis_dist_{}'.format(pop) + colname_pval = 'Mahalanobis_P_{}'.format(pop) + target_df[colname_dist] = ref_covariance_fit.mahalanobis(target_df[cols_pcs]) + target_df[colname_pval] = chi2.sf(target_df[colname_dist], n_pcs - 1) + compare_info['Mahalanobis_P_ALL'] = dict(target_df[colname_pval].describe()) + logger.info('Mahalanobis Probability Distribution (train: all reference samples): {}'.format( + compare_info['Mahalanobis_P_ALL'])) + + ## Check if PCs only capture target/reference stratification + if target_df.shape[0] >= 20: + for col_pc in cols_pcs: + mwu_pc = mannwhitneyu(ref_train_df[col_pc], target_df[col_pc]) + compare_info[col_pc] = {'U': mwu_pc.statistic, 'pvalue': mwu_pc.pvalue} + if mwu_pc.pvalue < 1e-4: + logger.warning("{} *may* be capturing target/reference stratification (Mann-Whitney p-value={}), " + "use visual inspection of PC plot to confirm".format(col_pc, mwu_pc.pvalue)) # Run Ancestry Assignment methods if method == 'Mahalanobis': @@ -93,13 +118,7 @@ def compare_ancestry(ref_df: pd.DataFrame, ref_pop_col: str, target_df: pd.DataF colname_dist = 'Mahalanobis_dist_{}'.format(pop) colname_pval = 'Mahalanobis_P_{}'.format(pop) - match covariance_method: - case 'MinCovDet': - covariance_model = MinCovDet() - case 'EmpiricalCovariance': - covariance_model = EmpiricalCovariance() - case _: - assert False, "Invalid covariance method" + covariance_model = get_covariance_method(covariance_method) covariance_fit = covariance_model.fit(ref_train_df.loc[ref_train_df[ref_pop_col] == pop, cols_pcs]) @@ -379,6 +398,7 @@ def nLL_mu_and_var(theta, df, c_score, l_predictors): return sum(np.log(np.sqrt(f_var(df[l_predictors], theta_var))) + (1/2)*(x - f_mu(df[l_predictors], theta_mu))**2/f_var(df[l_predictors], theta_var)) + def grdnt_mu_and_var(theta, df, c_score, l_predictors): """Gradient used to optimize the nLL_mu_and_var fit function. Adapted from https://github.com/broadinstitute/palantir-workflows/blob/v0.14/ImputationPipeline/ScoringTasks.wdl, From 55ae0fb2043f5f762ea8e7e777ddebf3f371d481 Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Fri, 17 Nov 2023 10:00:21 +0000 Subject: [PATCH 04/13] handle HTTP connection exceptions --- pgscatalog_utils/download/download_file.py | 40 +++++++++++++--------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/pgscatalog_utils/download/download_file.py b/pgscatalog_utils/download/download_file.py index a7a638e..d069d7f 100644 --- a/pgscatalog_utils/download/download_file.py +++ b/pgscatalog_utils/download/download_file.py @@ -16,10 +16,12 @@ def get_with_user_agent(url: str) -> requests.Response: return requests.get(url, headers=config.headers()) -def download_file(url: str, local_path: str, overwrite: bool, ftp_fallback: bool) -> None: +def download_file(url: str, local_path: str, overwrite: bool, + ftp_fallback: bool) -> None: if config.OUTDIR.joinpath(local_path).exists(): if not overwrite: - logger.warning(f"{config.OUTDIR.joinpath(local_path)} exists and overwrite is false, skipping download") + logger.warning( + f"{config.OUTDIR.joinpath(local_path)} exists and overwrite is false, skipping download") return elif overwrite: logger.warning(f"Overwriting {config.OUTDIR.joinpath(local_path)}") @@ -28,18 +30,24 @@ def download_file(url: str, local_path: str, overwrite: bool, ftp_fallback: bool attempt: int = 0 while attempt < config.MAX_RETRIES: - response: requests.Response = get_with_user_agent(url) - match response.status_code: - case 200: - with open(config.OUTDIR.joinpath(local_path), "wb") as f: - f.write(response.content) - logger.info("HTTPS download complete") - attempt = 0 - break - case _: - logger.warning(f"HTTP status {response.status_code} at download attempt {attempt}") - attempt += 1 - time.sleep(5) + try: + response: requests.Response = get_with_user_agent(url) + match response.status_code: + case 200: + with open(config.OUTDIR.joinpath(local_path), "wb") as f: + f.write(response.content) + logger.info("HTTPS download complete") + break + case _: + logger.warning( + f"HTTP status {response.status_code} at download attempt {attempt}") + attempt += 1 + time.sleep(5) + except requests.RequestException as e: + logger.warning(f"Connection error: {e}") + attempt += 1 + time.sleep(5) + logger.warning(f"Retrying download {attempt=} of {config.MAX_RETRIES}") if attempt > config.MAX_RETRIES: if ftp_fallback: @@ -67,9 +75,9 @@ def _ftp_fallback_download(url: str, local_path: str) -> None: except Exception as e: if "421" in str(e): retries += 1 - logger.debug(f"FTP server is busy. Waiting and retrying. Retry {retries} of {config.MAX_RETRIES}") + logger.debug( + f"FTP server is busy. Waiting and retrying. Retry {retries} of {config.MAX_RETRIES}") time.sleep(config.DOWNLOAD_WAIT_TIME) else: logger.critical(f"Download failed: {e}") raise Exception - From 29ff0ccf55992d8fa2c34507352b9150ac4c0a0b Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Tue, 28 Nov 2023 14:59:51 +0000 Subject: [PATCH 05/13] fix assert which fails when no matches are found --- pgscatalog_utils/match/combine_matches.py | 33 ++++++++++++++++++----- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/pgscatalog_utils/match/combine_matches.py b/pgscatalog_utils/match/combine_matches.py index a716f3f..32597c6 100644 --- a/pgscatalog_utils/match/combine_matches.py +++ b/pgscatalog_utils/match/combine_matches.py @@ -23,9 +23,12 @@ def combine_matches(): config.OUTDIR = args.outdir with pl.StringCache(): - scorefile = read_scorefile(path=args.scorefile, chrom=None) # chrom=None to read all variants + scorefile = read_scorefile(path=args.scorefile, + chrom=None) # chrom=None to read all variants logger.debug("Reading matches") - matches = pl.concat([pl.scan_ipc(x, memory_map=False, rechunk=False) for x in args.matches], rechunk=False) + matches = pl.concat( + [pl.scan_ipc(x, memory_map=False, rechunk=False) for x in args.matches], + rechunk=False) logger.debug("Labelling match candidates") params: dict[str, bool] = make_params_dict(args) @@ -49,7 +52,20 @@ def _check_duplicate_vars(matches: pl.LazyFrame): .collect() .get_column('count') .to_list()) - assert max_occurrence == [1], "Duplicate IDs in final matches" + + match n := max_occurrence[0]: + case None: + logger.critical("No variant matches found") + logger.critical( + "Did you set the correct genome build? Did you impute your genomes?") + raise ValueError + case _ if n > 1: + logger.critical("Duplicate IDs in final matches") + logger.critical( + "Please double check your genomes for duplicates and try again") + raise ValueError + case _: + logger.info("No duplicate variants found") def _parse_args(args=None): @@ -61,18 +77,21 @@ def _parse_args(args=None): parser.add_argument('-m', '--matches', dest='matches', required=True, nargs='+', help=' List of match files') parser.add_argument('--min_overlap', dest='min_overlap', required=True, - type=float, help=' Minimum proportion of variants to match before error') + type=float, + help=' Minimum proportion of variants to match before error') parser.add_argument('-IDs', '--filter_IDs', dest='filter', help=' Path to file containing list of variant IDs that can be included in the final scorefile.' '[useful for limiting scoring files to variants present in multiple datasets]') - parser = add_match_args(parser) # params for labelling matches + parser = add_match_args(parser) # params for labelling matches parser.add_argument('--outdir', dest='outdir', required=True, help=' Output directory') parser.add_argument('--split', dest='split', default=False, action='store_true', help=' Write scorefiles split per chromosome?') - parser.add_argument('--combined', dest='combined', default=False, action='store_true', + parser.add_argument('--combined', dest='combined', default=False, + action='store_true', help=' Write scorefiles in combined format?') - parser.add_argument('-n', dest='n_threads', default=1, help=' n threads for matching', type=int) + parser.add_argument('-n', dest='n_threads', default=1, + help=' n threads for matching', type=int) parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help=' Extra logging information') return parser.parse_args(args) From ab3aa4feb887c7689d54575b1abc2e9d3bd17af1 Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Tue, 28 Nov 2023 16:07:01 +0000 Subject: [PATCH 06/13] clarify log message --- pgscatalog_utils/match/combine_matches.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgscatalog_utils/match/combine_matches.py b/pgscatalog_utils/match/combine_matches.py index 32597c6..71b96d6 100644 --- a/pgscatalog_utils/match/combine_matches.py +++ b/pgscatalog_utils/match/combine_matches.py @@ -65,7 +65,7 @@ def _check_duplicate_vars(matches: pl.LazyFrame): "Please double check your genomes for duplicates and try again") raise ValueError case _: - logger.info("No duplicate variants found") + logger.info("Scoring files are valid (no duplicate variants found)") def _parse_args(args=None): From d626b60843b1fbb06f5a6ae34b2040a6c0491b4e Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Tue, 28 Nov 2023 16:18:28 +0000 Subject: [PATCH 07/13] bump for patch --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9d8a99c..571ed03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pgscatalog_utils" -version = "0.4.2" +version = "0.4.3" description = "Utilities for working with PGS Catalog API and scoring files" homepage = "https://github.com/PGScatalog/pgscatalog_utils" authors = ["Benjamin Wingfield ", "Samuel Lambert ", "Laurent Gil "] From 14b93a1b34ecfb4edfc848cbeed091d01d69a221 Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Wed, 29 Nov 2023 11:26:59 +0000 Subject: [PATCH 08/13] fix numeric IIDs --- pgscatalog_utils/ancestry/read.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pgscatalog_utils/ancestry/read.py b/pgscatalog_utils/ancestry/read.py index 79b1b84..77e6476 100644 --- a/pgscatalog_utils/ancestry/read.py +++ b/pgscatalog_utils/ancestry/read.py @@ -18,7 +18,7 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): for i, path in enumerate(loc_pcs): logger.debug("Reading PCA projection: {}".format(path)) - df = pd.read_csv(path, sep='\t') + df = pd.read_csv(path, sep='\t', converters={"IID": str}) df['sampleset'] = dataset df.set_index(['sampleset', 'IID'], inplace=True) @@ -46,7 +46,10 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): IDs_related = [x.strip() for x in infile.readlines()] proj.loc[proj.index.get_level_values(level=1).isin(IDs_related), 'Unrelated'] = False else: - proj['Unrelated'] = np.nan + # if unrelated is all nan -> dtype is float64 + # if unrelated is only true / false -> dtype is bool + # if unrelated contains None + proj['Unrelated'] = None return proj @@ -76,7 +79,7 @@ def read_pgs(loc_aggscore, onlySUM: bool): :return: """ logger.debug('Reading aggregated score data: {}'.format(loc_aggscore)) - df = pd.read_csv(loc_aggscore, sep='\t', index_col=['sampleset', 'IID']) + df = pd.read_csv(loc_aggscore, sep='\t', index_col=['sampleset', 'IID'], converters={"IID": str}) if onlySUM: df = df[[x for x in df.columns if x.endswith('_SUM')]] rn = [x.rstrip('_SUM') for x in df.columns] From 4881b44e81a981e99c63e0d353d71802c927948e Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Wed, 29 Nov 2023 11:30:27 +0000 Subject: [PATCH 09/13] re-add header=0 --- pgscatalog_utils/ancestry/read.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pgscatalog_utils/ancestry/read.py b/pgscatalog_utils/ancestry/read.py index 1b40ac0..0ac2e2f 100644 --- a/pgscatalog_utils/ancestry/read.py +++ b/pgscatalog_utils/ancestry/read.py @@ -18,7 +18,7 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): for i, path in enumerate(loc_pcs): logger.debug("Reading PCA projection: {}".format(path)) - df = pd.read_csv(path, sep='\t', converters={"IID": str}) + df = pd.read_csv(path, sep='\t', converters={"IID": str}, header=0) df['sampleset'] = dataset df.set_index(['sampleset', 'IID'], inplace=True) @@ -79,7 +79,7 @@ def read_pgs(loc_aggscore, onlySUM: bool): :return: """ logger.debug('Reading aggregated score data: {}'.format(loc_aggscore)) - df = pd.read_csv(loc_aggscore, sep='\t', index_col=['sampleset', 'IID'], converters={"IID": str}) + df = pd.read_csv(loc_aggscore, sep='\t', index_col=['sampleset', 'IID'], converters={"IID": str}, header=0) if onlySUM: df = df[[x for x in df.columns if x.endswith('_SUM')]] rn = [x.rstrip('_SUM') for x in df.columns] From 1cc3d96ae83825cee4ba420d95e5343aa58910be Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Wed, 29 Nov 2023 11:34:22 +0000 Subject: [PATCH 10/13] finish comment --- pgscatalog_utils/ancestry/read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pgscatalog_utils/ancestry/read.py b/pgscatalog_utils/ancestry/read.py index 0ac2e2f..8763e07 100644 --- a/pgscatalog_utils/ancestry/read.py +++ b/pgscatalog_utils/ancestry/read.py @@ -48,7 +48,7 @@ def read_pcs(loc_pcs: list[str],dataset: str, loc_related_ids=None, nPCs=None): else: # if unrelated is all nan -> dtype is float64 # if unrelated is only true / false -> dtype is bool - # if unrelated contains None + # if unrelated contains None, dtype stays bool, and pd.concat warning disappears proj['Unrelated'] = None return proj From 3d41f9c3f46b625fefba1098660b4ffaf1baca9c Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Wed, 29 Nov 2023 16:49:31 +0000 Subject: [PATCH 11/13] add link to log --- pgscatalog_utils/samplesheet/check.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pgscatalog_utils/samplesheet/check.py b/pgscatalog_utils/samplesheet/check.py index 7402efb..f282678 100755 --- a/pgscatalog_utils/samplesheet/check.py +++ b/pgscatalog_utils/samplesheet/check.py @@ -226,7 +226,11 @@ def _resolve_paths(path_list: list[str], filetype: str) -> list[str]: resolved_list.append(str(resolved)) else: logger.critical( - f"{resolved} doesn't exist, please check samplesheet path_prefix and try again") + f"{resolved} doesn't exist, please check samplesheet path_prefix and try again" + ) + logger.critical( + "If you're 100% sure this file exists and you're confused by this error, please check https://pgsc-calc.readthedocs.io/en/latest/how-to/mount.html" + ) raise FileNotFoundError return resolved_list From 850eb1c32234936e5c3e2973f8613b386179c764 Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Wed, 29 Nov 2023 17:12:23 +0000 Subject: [PATCH 12/13] check for chr prefix in samplesheet --- pgscatalog_utils/samplesheet/check.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pgscatalog_utils/samplesheet/check.py b/pgscatalog_utils/samplesheet/check.py index f282678..377bc06 100755 --- a/pgscatalog_utils/samplesheet/check.py +++ b/pgscatalog_utils/samplesheet/check.py @@ -27,8 +27,12 @@ def _parse_args(args=None) -> argparse.Namespace: def _truncate_chrom(chrom): try: return str(int(chrom)) # truncate numeric chromosomes 22.0 -> 22 - except ValueError: # it's OK if chrom is a string e.g. MT / X / Y - return chrom + except ValueError: + if "chr" in chrom: + logger.critical("Please remove chr prefix from samplesheet chromosome column e.g. chr1 -> 1, chrX -> X") + raise ValueError("chr prefix detected") + else: + return chrom # it's OK if chrom is a string e.g. MT / X / Y except TypeError: # also OK if chrom is missing entirely return None From 4035311f48984a9ed5a06b125a0ea8d709628a1e Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Wed, 29 Nov 2023 17:31:33 +0000 Subject: [PATCH 13/13] fix missing chromosomes --- pgscatalog_utils/samplesheet/check.py | 163 +++++++++++++++----------- 1 file changed, 93 insertions(+), 70 deletions(-) diff --git a/pgscatalog_utils/samplesheet/check.py b/pgscatalog_utils/samplesheet/check.py index 377bc06..b1ff16b 100755 --- a/pgscatalog_utils/samplesheet/check.py +++ b/pgscatalog_utils/samplesheet/check.py @@ -13,33 +13,38 @@ def _parse_args(args=None) -> argparse.Namespace: - d: str = "Convert pgscatalog/pgsc_calc samplesheet file to JSON and check its contents." + d: ( + str + ) = "Convert pgscatalog/pgsc_calc samplesheet file to JSON and check its contents." e: str = "Example usage: python check.py " parser: argparse.ArgumentParser = argparse.ArgumentParser(description=d, epilog=e) parser.add_argument("FILE_IN", help="Input samplesheet file.") - parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', - help=' Extra logging information') + parser.add_argument( + "-v", + "--verbose", + dest="verbose", + action="store_true", + help=" Extra logging information", + ) parser.add_argument("FILE_OUT", help="Output file.") return parser.parse_args(args) def _truncate_chrom(chrom): - try: - return str(int(chrom)) # truncate numeric chromosomes 22.0 -> 22 - except ValueError: - if "chr" in chrom: + match chrom: + case _ if chrom.isdigit(): + return int(chrom) + case _ if chrom.startswith("chr"): logger.critical("Please remove chr prefix from samplesheet chromosome column e.g. chr1 -> 1, chrX -> X") raise ValueError("chr prefix detected") - else: - return chrom # it's OK if chrom is a string e.g. MT / X / Y - except TypeError: # also OK if chrom is missing entirely - return None + case _: + return chrom def _check_colnames(df: pd.DataFrame): - mandatory: list[str] = ['sampleset', 'path_prefix', 'chrom', 'format'] - optional: list[str] = ['vcf_genotype_field'] + mandatory: list[str] = ["sampleset", "path_prefix", "chrom", "format"] + optional: list[str] = ["vcf_genotype_field"] if not set(mandatory) == set(df.columns): if set(mandatory + optional) == set(df.columns): @@ -48,14 +53,17 @@ def _check_colnames(df: pd.DataFrame): else: logger.critical("Samplesheet has invalid header row") logger.critical(f"Column names must only include: {mandatory}") - [logger.critical(f"Invalid column name: {col}") for col in df if - col not in mandatory] + [ + logger.critical(f"Invalid column name: {col}") + for col in df + if col not in mandatory + ] raise Exception def _check_unique_paths(df: pd.DataFrame): - """ Each row in a samplesheet should have a unique path """ - duplicated: pd.Series = df['path_prefix'].duplicated() + """Each row in a samplesheet should have a unique path""" + duplicated: pd.Series = df["path_prefix"].duplicated() for idx, duplicate in duplicated.items(): if duplicate: bad_record = df.iloc[:idx] @@ -63,8 +71,8 @@ def _check_unique_paths(df: pd.DataFrame): def _check_empty_paths(df: pd.DataFrame): - """ Paths are mandatory """ - empty_paths: pd.Series = df['path_prefix'].isnull() + """Paths are mandatory""" + empty_paths: pd.Series = df["path_prefix"].isnull() for idx, empty in empty_paths.items(): if empty: logger.critical(f"Empty path found in samplesheet:\n {df.iloc[[idx]]}") @@ -72,8 +80,8 @@ def _check_empty_paths(df: pd.DataFrame): def _read_samplesheet(path: str) -> pd.DataFrame: - csv: pd.DataFrame = pd.read_csv(path, sep=',', header=0) - csv['chrom'] = csv['chrom'].apply(_truncate_chrom) + csv: pd.DataFrame = pd.read_csv(path, sep=",", header=0, converters={"chrom": str}) + csv["chrom"] = csv["chrom"].apply(_truncate_chrom) return csv @@ -85,8 +93,8 @@ def _check_paths(df: pd.DataFrame) -> None: def _get_chrom_list(df: pd.DataFrame) -> dict[str, list[str | None]]: chrom_dict = {} for idx, row in df.iterrows(): - key = row['sampleset'] - value = row['chrom'] + key = row["sampleset"] + value = row["chrom"] try: if math.isnan(value): value = None @@ -101,8 +109,9 @@ def _get_chrom_list(df: pd.DataFrame) -> dict[str, list[str | None]]: def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None: seen = set() - duplicate_chromosomes: list[str] = [str(x) for x in chrom_list if - x in seen or seen.add(x)] + duplicate_chromosomes: list[str] = [ + str(x) for x in chrom_list if x in seen or seen.add(x) + ] if duplicate_chromosomes: logger.critical(f"Duplicate chromosomes detected in sampleset {sampleset}") logger.critical(f"Duplicate chromosomes: {duplicate_chromosomes}") @@ -113,11 +122,14 @@ def _check_multiple_missing_chrom(sampleset: str, chrom_list: dict) -> None: for chrom in chrom_list: if chrom is None and len(chrom_list) != 1: logger.critical( - f"Sampleset {sampleset} has rows with multiple missing chromosomes") + f"Sampleset {sampleset} has rows with multiple missing chromosomes" + ) logger.critical( - "If you have file with multiple chromosomes, delete the duplicate rows") + "If you have file with multiple chromosomes, delete the duplicate rows" + ) logger.critical( - "If your data are split per chromosome, then chromosomes must be set for all rows") + "If your data are split per chromosome, then chromosomes must be set for all rows" + ) raise Exception @@ -131,39 +143,41 @@ def _check_chrom(df: pd.DataFrame) -> None: def _check_format(df: pd.DataFrame): - """ Make sure the file format is a valid choice """ + """Make sure the file format is a valid choice""" for idx, row in df.iterrows(): - valid_formats: list[str] = ['vcf', 'pfile', 'bfile'] - if row['format'] not in valid_formats: + valid_formats: list[str] = ["vcf", "pfile", "bfile"] + if row["format"] not in valid_formats: logger.critical( - f"Invalid format: {row['format']} must be one of {valid_formats}") + f"Invalid format: {row['format']} must be one of {valid_formats}" + ) logger.critical(f"\n{df.iloc[[idx]]}") raise Exception def _setup_paths(df: pd.DataFrame) -> pd.DataFrame: - """ Add suffix to path prefixes depending on file format / type """ + """Add suffix to path prefixes depending on file format / type""" paths: list[pd.Series] = [] for idx, row in df.iterrows(): suffix: list[str] - match row['format']: - case 'vcf': + match row["format"]: + case "vcf": logger.info("Setting VCF input") - suffix = ['.vcf.gz'] - case 'bfile': + suffix = [".vcf.gz"] + case "bfile": logger.info("Setting plink1 binary fileset (bfile) input") - suffix = ['.bed', '.bim', '.fam'] - case 'pfile': + suffix = [".bed", ".bim", ".fam"] + case "pfile": logger.info("Setting plink2 binary fileset (pfile) input") - suffix = ['.pgen', '.pvar', '.psam'] + suffix = [".pgen", ".pvar", ".psam"] case _: raise Exception resolved_paths: list[str] = _resolve_paths( - [row['path_prefix'] + x for x in suffix], row['format']) + [row["path_prefix"] + x for x in suffix], row["format"] + ) paths.append(pd.Series(data=[resolved_paths], index=[idx])) - df['path'] = pd.concat(paths) + df["path"] = pd.concat(paths) return df @@ -171,7 +185,7 @@ def _resolve_compressed_variant_path(path: str) -> pathlib.Path: # .bim.zst | .bim -> OK # .pvar.zst | .pvar -> OK # anything else not OK - zstd_ext: str = '.zst' + zstd_ext: str = ".zst" compressed_path: pathlib.Path = pathlib.Path(path + zstd_ext).resolve() uncompressed_path: pathlib.Path = pathlib.Path(path).resolve() @@ -181,13 +195,15 @@ def _resolve_compressed_variant_path(path: str) -> pathlib.Path: return compressed_path elif uncompressed_path.exists(): logger.info( - f"Couldn't find compressed variant information file, trying {uncompressed_path.name}") + f"Couldn't find compressed variant information file, trying {uncompressed_path.name}" + ) return uncompressed_path else: logger.critical(f"{compressed_path} doesn't exist") logger.critical(f"{uncompressed_path} doesn't exist") logger.critical( - "Couldn't find variant information files, please check samplesheet path_prefix and try again") + "Couldn't find variant information files, please check samplesheet path_prefix and try again" + ) raise Exception @@ -198,7 +214,8 @@ def _resolve_paths(path_list: list[str], filetype: str) -> list[str]: base_dir: Path = Path(Config.input_path).resolve().parent if (path := Path(Config.input_path)).is_symlink(): logger.info( - f"Input file {path} is symlinked, resolving to absolute path {path.resolve()}") + f"Input file {path} is symlinked, resolving to absolute path {path.resolve()}" + ) for path in path_list: if path.startswith("https://") | path.startswith("s3://"): @@ -212,15 +229,18 @@ def _resolve_paths(path_list: list[str], filetype: str) -> list[str]: p: Path = Path(path) if not p.is_absolute(): logger.warning( - "Relative path detected in samplesheet. Set absolute paths to silence this warning.") - logger.warning( - "Assuming input samplesheet is a symlinked file in a nextflow working directory") + "Relative path detected in samplesheet. Set absolute paths to silence this warning." + ) logger.warning( - "Following symlink and attempting to resolve path relative to input file") + "Assuming input samplesheet is a symlinked file in a nextflow working directory" + ) logger.warning( - f"Resolving paths relative to: {base_dir}") - resolved = _resolve_filetypes(path=str(base_dir.joinpath(path)), - filetype=filetype) + "Following symlink and attempting to resolve path relative to input file" + ) + logger.warning(f"Resolving paths relative to: {base_dir}") + resolved = _resolve_filetypes( + path=str(base_dir.joinpath(path)), filetype=filetype + ) else: logger.info("Absolute path detected") resolved = _resolve_filetypes(filetype=filetype, path=str(p)) @@ -242,13 +262,13 @@ def _resolve_paths(path_list: list[str], filetype: str) -> list[str]: def _resolve_filetypes(filetype: str, path: str) -> Path: match filetype: - case 'pfile' | 'bfile': - if path.endswith('.bim') or path.endswith('.pvar'): + case "pfile" | "bfile": + if path.endswith(".bim") or path.endswith(".pvar"): resolved = _resolve_compressed_variant_path(path) else: # bed / pgen | fam / psam resolved = pathlib.Path(path).resolve() - case 'vcf': + case "vcf": resolved = pathlib.Path(path).resolve() case _: logger.critical(f"Unsupported filetype {filetype}") @@ -258,24 +278,25 @@ def _resolve_filetypes(filetype: str, path: str) -> Path: def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame: - df['vcf_import_dosage'] = False # (dosage off by default) - if 'vcf_genotype_field' in df.columns: + df["vcf_import_dosage"] = False # (dosage off by default) + if "vcf_genotype_field" in df.columns: logger.debug("vcf_genotype_field detected") for index, row in df.iterrows(): - if row['vcf_genotype_field'] not in ['GT', 'DS']: + if row["vcf_genotype_field"] not in ["GT", "DS"]: missing: bool # missing dosage is OK try: - missing = math.isnan(row['vcf_genotype_field']) + missing = math.isnan(row["vcf_genotype_field"]) except TypeError: missing = False if not missing: logger.critical( - f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}") + f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}" + ) logger.critical(f"\n {row}") raise Exception - df.loc[df['vcf_genotype_field'] == 'DS', 'vcf_import_dosage'] = True + df.loc[df["vcf_genotype_field"] == "DS", "vcf_import_dosage"] = True else: logger.info("no vcf_genotype_field detected") @@ -283,23 +304,26 @@ def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame: def _check_reserved_names(df: pd.DataFrame): - if any(df['sampleset'] == 'reference'): + if any(df["sampleset"] == "reference"): logger.critical( - "Samplesets must not be named 'reference', please rename in the sample sheet") + "Samplesets must not be named 'reference', please rename in the sample sheet" + ) raise Exception # Check whether reference contains reserved tokens from nextflow channels - badnames = [x for x in df['sampleset'] if ('.' in x or '_' in x)] + badnames = [x for x in df["sampleset"] if ("." in x or "_" in x)] if len(badnames) > 0: logger.critical( "Samplesets must not contain any reserved characters ( '_' , '.'), " "please rename the following samples in the sample sheet: {}".format( - badnames)) + badnames + ) + ) raise Exception def _check_one_sampleset(df: pd.DataFrame): - samplesets = set(df['sampleset'].to_list()) + samplesets = set(df["sampleset"].to_list()) if len(samplesets) > 1: logger.critical(f"Multiple samplesets defined in the samplesheet {samplesets}") sampleset_error = """ Only one sampleset per samplesheet is supported @@ -307,7 +331,7 @@ def _check_one_sampleset(df: pd.DataFrame): pgsc_calc works best with cohorts Individual VCFs should be merged into a multi-sample VCF If you want to process multiple cohorts, please run pgsc_calc multiple times with different samplesheets. """ - [logger.critical(x.strip()) for x in sampleset_error.split('\n')] + [logger.critical(x.strip()) for x in sampleset_error.split("\n")] raise Exception("Multiple samplesets") @@ -338,8 +362,7 @@ def check_samplesheet() -> None: df = _check_genotype_field(df) # dosages logger.info("Samplesheet checks complete") - (df.drop(['path_prefix'], axis=1) - .to_json(Config.output_path, orient='records')) + (df.drop(["path_prefix"], axis=1).to_json(Config.output_path, orient="records")) logger.info(f"JSON file successfully written to {Config.output_path}")