Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix path resolution in check_samplesheet #56

Merged
merged 3 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pgscatalog_utils/samplesheet/Config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass
class Config:
input_path: str
output_path: str
132 changes: 89 additions & 43 deletions pgscatalog_utils/samplesheet/check.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import argparse
import logging
import math
import os
import pathlib
from pathlib import Path

import pandas as pd

from pathlib import Path
from pgscatalog_utils import config
from pgscatalog_utils.samplesheet.Config import Config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +44,8 @@ def _check_colnames(df: pd.DataFrame):
else:
logger.critical("Samplesheet has invalid header row")
logger.critical(f"Column names must only include: {mandatory}")
[logger.critical(f"Invalid column name: {col}") for col in df if col not in mandatory]
[logger.critical(f"Invalid column name: {col}") for col in df if
col not in mandatory]
raise Exception


Expand Down Expand Up @@ -96,7 +97,8 @@ def _get_chrom_list(df: pd.DataFrame) -> dict[str, list[str | None]]:

def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None:
seen = set()
duplicate_chromosomes: list[str] = [str(x) for x in chrom_list if x in seen or seen.add(x)]
duplicate_chromosomes: list[str] = [str(x) for x in chrom_list if
x in seen or seen.add(x)]
if duplicate_chromosomes:
logger.critical(f"Duplicate chromosomes detected in sampleset {sampleset}")
logger.critical(f"Duplicate chromosomes: {duplicate_chromosomes}")
Expand All @@ -106,9 +108,12 @@ def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None:
def _check_multiple_missing_chrom(sampleset: str, chrom_list: dict) -> None:
for chrom in chrom_list:
if chrom is None and len(chrom_list) != 1:
logger.critical(f"Sampleset {sampleset} has rows with multiple missing chromosomes")
logger.critical("If you have file with multiple chromosomes, delete the duplicate rows")
logger.critical("If your data are split per chromosome, then chromosomes must be set for all rows")
logger.critical(
f"Sampleset {sampleset} has rows with multiple missing chromosomes")
logger.critical(
"If you have file with multiple chromosomes, delete the duplicate rows")
logger.critical(
"If your data are split per chromosome, then chromosomes must be set for all rows")
raise Exception


Expand All @@ -126,7 +131,8 @@ def _check_format(df: pd.DataFrame):
for idx, row in df.iterrows():
valid_formats: list[str] = ['vcf', 'pfile', 'bfile']
if row['format'] not in valid_formats:
logger.critical(f"Invalid format: {row['format']} must be one of {valid_formats}")
logger.critical(
f"Invalid format: {row['format']} must be one of {valid_formats}")
logger.critical(f"\n{df.iloc[[idx]]}")
raise Exception

Expand All @@ -149,7 +155,8 @@ def _setup_paths(df: pd.DataFrame) -> pd.DataFrame:
case _:
raise Exception

resolved_paths: list[str] = _resolve_paths([row['path_prefix'] + x for x in suffix], row['format'])
resolved_paths: list[str] = _resolve_paths(
[row['path_prefix'] + x for x in suffix], row['format'])
paths.append(pd.Series(data=[resolved_paths], index=[idx]))

df['path'] = pd.concat(paths)
Expand All @@ -169,48 +176,79 @@ def _resolve_compressed_variant_path(path: str) -> pathlib.Path:
logger.info(f"Found compressed variant information file {compressed_path.name}")
return compressed_path
elif uncompressed_path.exists():
logger.info(f"Couldn't find compressed variant information file, trying {uncompressed_path.name}")
logger.info(
f"Couldn't find compressed variant information file, trying {uncompressed_path.name}")
return uncompressed_path
else:
logger.critical(f"{compressed_path} doesn't exist")
logger.critical(f"{uncompressed_path} doesn't exist")
logger.critical("Couldn't find variant information files, please check samplesheet path_prefix and try again")
logger.critical(
"Couldn't find variant information files, please check samplesheet path_prefix and try again")
raise Exception


def _resolve_paths(path_list: list[str], filetype: str) -> list[str]:
resolved_list: list[str] = []
for path in path_list:
if not Path(path).is_absolute():
logger.warning("Relative path detected in samplesheet. Set absolute paths to silence this warning.")
logger.warning("Assuming program working directory is a nextflow work directory (e.g. work/4c/8585/...)")
base_dir: Path = Path(os.getcwd()).parent.parent.parent
logger.warning(f"Resolving paths relative to work directory parent {base_dir}")
path = str(base_dir.joinpath(path))

match filetype:
case 'pfile' | 'bfile':
if path.endswith('.bim') or path.endswith('.pvar'):
resolved = _resolve_compressed_variant_path(path)
else:
# bed / pgen | fam / psam
resolved = pathlib.Path(path).resolve()
case 'vcf':
resolved = pathlib.Path(path).resolve()
case _:
logger.critical(f"Unsupported filetype {filetype}")
raise Exception

if resolved.exists():
logger.info(f"{resolved} exists")
resolved_list.append(str(resolved))
# always resolve the input samplesheet
base_dir: Path = Path(Config.input_path).resolve().parent
if (path := Path(Config.input_path)).is_symlink():
logger.info(
f"Input file {path} is symlinked, resolving to absolute path {path.resolve()}")

for path in path_list:
if path.startswith("https://") | path.startswith("s3://"):
logger.info("Remote path detected, skipping resolve")
resolved_list.append(str(path))
continue
elif path.startswith("http://"):
logger.critical("HTTP download is insecure! Did you mean https:// ?")
raise Exception("Insecure path detected")
else:
logger.critical(f"{resolved} doesn't exist, please check samplesheet path_prefix and try again")
raise FileNotFoundError
p: Path = Path(path)
if not p.is_absolute():
logger.warning(
"Relative path detected in samplesheet. Set absolute paths to silence this warning.")
logger.warning(
"Assuming input samplesheet is a symlinked file in a nextflow working directory")
logger.warning(
"Following symlink and attempting to resolve path relative to input file")
logger.warning(
f"Resolving paths relative to: {base_dir}")
resolved = _resolve_filetypes(path=str(base_dir.joinpath(path)),
filetype=filetype)
else:
logger.info("Absolute path detected")
resolved = _resolve_filetypes(filetype=filetype, path=str(p))

if resolved.exists():
logger.info(f"{resolved} exists")
resolved_list.append(str(resolved))
else:
logger.critical(
f"{resolved} doesn't exist, please check samplesheet path_prefix and try again")
raise FileNotFoundError

return resolved_list


def _resolve_filetypes(filetype: str, path: str) -> Path:
match filetype:
case 'pfile' | 'bfile':
if path.endswith('.bim') or path.endswith('.pvar'):
resolved = _resolve_compressed_variant_path(path)
else:
# bed / pgen | fam / psam
resolved = pathlib.Path(path).resolve()
case 'vcf':
resolved = pathlib.Path(path).resolve()
case _:
logger.critical(f"Unsupported filetype {filetype}")
raise Exception

return resolved


def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:
df['vcf_import_dosage'] = False # (dosage off by default)
if 'vcf_genotype_field' in df.columns:
Expand All @@ -224,7 +262,8 @@ def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:
missing = False

if not missing:
logger.critical(f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}")
logger.critical(
f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}")
logger.critical(f"\n {row}")
raise Exception

Expand All @@ -237,14 +276,17 @@ def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame:

def _check_reserved_names(df: pd.DataFrame):
if any(df['sampleset'] == 'reference'):
logger.critical("Samplesets must not be named 'reference', please rename in the sample sheet")
logger.critical(
"Samplesets must not be named 'reference', please rename in the sample sheet")
raise Exception

# Check whether reference contains reserved tokens from nextflow channels
badnames = [x for x in df['sampleset'] if ('.' in x or '_' in x)]
if len(badnames) > 0:
logger.critical("Samplesets must not contain any reserved characters ( '_' , '.'), "
"please rename the following samples in the sample sheet: {}".format(badnames))
logger.critical(
"Samplesets must not contain any reserved characters ( '_' , '.'), "
"please rename the following samples in the sample sheet: {}".format(
badnames))
raise Exception


Expand All @@ -269,7 +311,11 @@ def check_samplesheet() -> None:
"""
args = _parse_args()
config.set_logging_level(args.verbose)
df = _read_samplesheet(args.FILE_IN)

Config.input_path = args.FILE_IN
Config.output_path = args.FILE_OUT

df = _read_samplesheet(Config.input_path)

# check df for errors
_check_one_sampleset(df)
Expand All @@ -285,8 +331,8 @@ def check_samplesheet() -> None:

logger.info("Samplesheet checks complete")
(df.drop(['path_prefix'], axis=1)
.to_json(args.FILE_OUT, orient='records'))
logger.info(f"JSON file successfully written to {args.FILE_OUT}")
.to_json(Config.output_path, orient='records'))
logger.info(f"JSON file successfully written to {Config.output_path}")


if __name__ == "__main__":
Expand Down