diff --git a/README.md b/README.md index 32673e7..77136db 100644 --- a/README.md +++ b/README.md @@ -4,25 +4,31 @@ Umi-pipeline-nf ====================== -**Umi-pipeline-nf** creates highly accurate single-molecule consensus sequences for unique molecular identifier (UMI)-tagged amplicon data. +**Umi-pipeline-nf** creates highly accurate single-molecule consensus sequences for unique molecular identifier (UMI)-tagged amplicons from nanopore sequencing data. The pipeline can be run for the whole fastq_pass folder of your nanopore run and, per default, outputs the aligned consensus sequences of each UMI cluster in bam file. The optional variant calling creates a vcf file for all variants that are found in the consensus sequences. -umi-pipeline-nf is based on the snakemake [ONT UMI analysis pipeline](https://github.com/nanoporetech/pipeline-umi-amplicon) (workflow originally developed by [Karst et al, Nat Biotechnol 18:165–169, 2021](https://www.nature.com/articles/s41592-020-01041-y)). We transferred the pipeline to [Nextflow](https://www.nextflow.io) and included [additional functionalities](#main-adaptations). +umi-pipeline-nf is inspired by a snakemake-based analysis pipeline ([ONT UMI analysis pipeline](https://github.com/nanoporetech/pipeline-umi-amplicon); originally developed by [Karst et al, Nat Biotechnol 18:165–169, 2021](https://www.nature.com/articles/s41592-020-01041-y)). We migrated the pipeline in [Nextflow](https://www.nextflow.io), included several optimizations and [additional functionalities](#main-adaptations). + +![Workflow](docs/images/umi-pipeline-nf_metro-map.svg) ## Workflow -1. Input reads are aligned against a reference genome. -2. The flanking UMI sequences of all reads are extracted. -3. The extracted UMIs are used to cluster the reads. -4. Per cluster, highly accurate consensus sequences are created. -5. The consensus sequences are aligned against the reference sequenced. -6. An optional variant calling step can be performed. +1. Input Fastq-files are merged and filtered. +2. Reads are aligned against a reference genome and filtered to keep only full-length on-target reads. +3. The flanking UMI sequences of all reads are extracted. +4. The extracted UMIs are used to cluster the reads. +5. Per cluster, highly accurate consensus sequences are created. +6. The consensus sequences are aligned against the reference sequenced. +7. An optional variant calling step can be performed. +8. UMI-extraction, clustering, consensus sequence creation, and mapping are repeated. +9. An optional variant calling step can be performed. > See the [output documentation](docs/output.md) for a detailed overview of the pipeline and its output files. ## Main Adaptations -* It comes with docker containers making **installation simple, portable** and **results highly reproducible**. +* It comes with a docker/singularity container making **installation simple, easy to use on clusters** and **results highly reproducible**. * The pipeline is **optimized for parallelization**. +* **Additional UMI cluster splitting** step to remove admixed UMI clusters. * Read filtering strategy per UMI cluster was adapted to **preserve the highest quality reads**. * **Three commonly used variant callers** ([freebayes](https://github.com/freebayes/freebayes), [lofreq](http://csb5.github.io/lofreq/) or [mutserve](https://mitoverse.readthedocs.io/mutserve/mutserve/)) are supported by the pipeline. * The raw reads can be optionally **subsampled**. @@ -32,19 +38,19 @@ umi-pipeline-nf is based on the snakemake [ONT UMI analysis pipeline](https://gi ## Quick Start -1. Install [`nextflow`](https://www.nextflow.io/) +1. Install [`nextflow`](https://www.nextflow.io/). -2. Download the pipeline and test it on a minimal dataset with a single command +2. Download the pipeline and test it on a [minimal dataset](data/info.txt) with a single command. ```bash -nextflow run genepi/umi-pipeline-nf -profile test,docker +nextflow run genepi/umi-pipeline-nf -r v0.1.0 -profile test,docker ``` -3. Start running your own analysis! -3.1 Download and adapt the config/custom.config with paths to your data (relative and absolute paths possible) +3. Start running your own analysis! +3.1 Download and adapt the config/custom.config with paths to your data (relative and absolute paths possible). ```bash -nextflow run genepi/umi-pipeline-nf -r main -c -profile docker +nextflow run genepi/umi-pipeline-nf -r v0.1.0 -c -profile docker ``` @@ -52,5 +58,5 @@ nextflow run genepi/umi-pipeline-nf -r main -c -profile docker The pipeline was written by ([@StephanAmstler](https://github.com/AmstlerStephan)). Nextflow template pipeline: [EcSeq](https://github.com/ecSeq). -Original snakemake-based pipeline: [nanoporetech/pipeline-umi-amplicon](https://github.com/nanoporetech/pipeline-umi-amplicon). +Snakemake-based ONT pipeline: [nanoporetech/pipeline-umi-amplicon](https://github.com/nanoporetech/pipeline-umi-amplicon). Original workflow: [SorenKarst/longread_umi](https://github.com/SorenKarst/longread_umi). diff --git a/bin/extract_umis.py b/bin/extract_umis.py index 6ab02b3..24640bc 100755 --- a/bin/extract_umis.py +++ b/bin/extract_umis.py @@ -45,10 +45,10 @@ def parse_args(argv): help="Max edit distance for UMI", ) parser.add_argument( - "--adapter-length", + "--adapter_length", dest="ADAPTER_LENGTH", type=int, - default=250, + default=200, help="Length of adapter", ) parser.add_argument( @@ -106,10 +106,27 @@ def parse_args(argv): return args -def extract_umi(query_seq, query_qual, pattern, max_edit_dist, format): +def clip_entry(entry, umi_start_fwd, umi_start_rev, adapter_length, format): + clip_fwd = umi_start_fwd + if adapter_length == umi_start_rev: + clip_rev = None + else: + clip_rev = umi_start_rev - adapter_length + + entry.sequence = entry.sequence[clip_fwd:clip_rev] + + if format == "fastq": + entry.quality = entry.quality[clip_fwd:clip_rev] + + return entry + +def extract_umi(query_seq, query_qual, pattern, max_edit_dist, format, direction): umi_qual = None equalities = [("M", "A"), ("M", "C"), ("R", "A"), ("R", "G"), ("W", "A"), ("W", "T"), ("S", "C"), ("S", "G"), ("Y", "C"), ("Y", "T"), ("K", "G"), ("K", "T"), ("V", "A"), ("V", "C"), - ("V", "G"), ("H", "A"), ("H", "C"), ("H", "T"), ("D", "A"), ("D", "G"), ("D", "T"), ("B", "C"), ("B", "G"), ("B", "T"), ("N", "A"), ("N", "C"), ("N", "G"), ("N", "T")] + ("V", "G"), ("H", "A"), ("H", "C"), ("H", "T"), ("D", "A"), ("D", "G"), ("D", "T"), ("B", "C"), ("B", "G"), ("B", "T"), ("N", "A"), ("N", "C"), ("N", "G"), ("N", "T"), + ("m", "a"), ("m", "c"), ("r", "a"), ("r", "g"), ("w", "a"), ("w", "t"), ("s", "c"), ("s", "g"), ("y", "c"), ("y", "t"), ("k", "g"), ("k", "t"), ("v", "a"), ("v", "c"), + ("v", "g"), ("h", "a"), ("h", "c"), ("h", "t"), ("d", "a"), ("d", "g"), ("d", "t"), ("b", "c"), ("b", "g"), ("b", "t"), ("n", "a"), ("n", "c"), ("n", "g"), ("n", "t"), + ("a", "A"), ("c", "C"), ("t", "T"), ("g", "G")] result = edlib.align( pattern, @@ -120,31 +137,38 @@ def extract_umi(query_seq, query_qual, pattern, max_edit_dist, format): additionalEqualities=equalities, ) if result["editDistance"] == -1: - return None, None, None + return None, None, None, None edit_dist = result["editDistance"] locs = result["locations"][0] - umi = query_seq[locs[0]:locs[1]+1] + umi_start_pos = locs[0] + umi_end_pos = locs[1] + 1 + umi = query_seq[umi_start_pos:umi_end_pos] + + if direction == "fwd": + umi_start = umi_start_pos + else: + umi_start = umi_end_pos if format == "fastq": umi_qual = query_qual[locs[0]:locs[1]+1] - return edit_dist, umi, umi_qual + return edit_dist, umi, umi_qual, umi_start -def extract_adapters(entry, max_adapter_length, format): +def extract_adapters(entry, adapter_length, format): read_5p_seq = None read_5p_qual = None read_3p_seq = None read_3p_qual = None - if len(entry.sequence) > max_adapter_length: - read_5p_seq = entry.sequence[:max_adapter_length] - read_3p_seq = entry.sequence[-max_adapter_length:] + if len(entry.sequence) > adapter_length: + read_5p_seq = entry.sequence[:adapter_length] + read_3p_seq = entry.sequence[-adapter_length:] if format == "fastq": - read_5p_qual = entry.quality[:max_adapter_length] - read_3p_qual = entry.quality[-max_adapter_length:] + read_5p_qual = entry.quality[:adapter_length] + read_3p_qual = entry.quality[-adapter_length:] return read_5p_seq, read_3p_seq, read_5p_qual, read_3p_qual @@ -288,7 +312,7 @@ def write_tsv( def extract_umis( args ): - max_adapter_length = args.ADAPTER_LENGTH + adapter_length = args.ADAPTER_LENGTH max_pattern_dist = args.MAX_ERROR output_folder = args.OUT tsv = args.TSV @@ -311,7 +335,7 @@ def extract_umis( n_total += 1 read_5p_seq, read_3p_seq, read_5p_qual, read_3p_qual = extract_adapters( - entry, max_adapter_length, format + entry, adapter_length, format ) if read_5p_seq is None or read_3p_seq is None: @@ -320,22 +344,26 @@ def extract_umis( strand_stats[strand] += 1 # Extract fwd UMI - result_5p_fwd_umi_dist, result_5p_fwd_umi_seq, result_5p_fwd_umi_qual = extract_umi( - read_5p_seq, read_5p_qual, umi_fwd, max_pattern_dist, format + result_5p_fwd_umi_dist, result_5p_fwd_umi_seq, result_5p_fwd_umi_qual, umi_start_fwd = extract_umi( + read_5p_seq, read_5p_qual, umi_fwd, max_pattern_dist, format, "fwd" ) # Extract rev UMI - result_3p_rev_umi_dist, result_3p_rev_umi_seq, result_3p_rev_umi_qual = extract_umi( - read_3p_seq, read_3p_qual, umi_rev, max_pattern_dist, format + result_3p_rev_umi_dist, result_3p_rev_umi_seq, result_3p_rev_umi_qual, umi_start_rev = extract_umi( + read_3p_seq, read_3p_qual, umi_rev, max_pattern_dist, format, "rev" ) if not result_5p_fwd_umi_seq or not result_3p_rev_umi_seq: continue + clipped_entry = clip_entry( + entry, umi_start_fwd, umi_start_rev, adapter_length, format + ) + n_both_umi += 1 if format == "fasta": write_fasta( - entry, + clipped_entry, strand, result_5p_fwd_umi_dist, result_3p_rev_umi_dist, @@ -346,7 +374,7 @@ def extract_umis( if format == "fastq": write_fastq( - entry, + clipped_entry, strand, result_5p_fwd_umi_dist, result_3p_rev_umi_dist, diff --git a/bin/filter_reads.py b/bin/filter_reads.py index e7f822c..c42f4a5 100644 --- a/bin/filter_reads.py +++ b/bin/filter_reads.py @@ -48,6 +48,14 @@ def parse_args(argv): default=0.9, help="Min overlap with target region", ) + + parser.add_argument( + "--adapter_length", + dest="ADAPTER_LENGTH", + type=int, + default=200, + help="Length of adapter", + ) parser.add_argument( "--include_secondary_reads", @@ -160,7 +168,7 @@ def write_fastq(read, out_f): def filter_reads(args): bed_regions = args.BED[0] bam_file = args.BAM - max_clipping = 250 + adapter_length = args.ADAPTER_LENGTH min_overlap = args.MIN_OVERLAP incl_sec = args.INCL_SEC output = args.OUT @@ -177,12 +185,7 @@ def filter_reads(args): n_supplementary = 0 n_secondary = 0 n_total = 0 - - filtered_perc = 0 - unmapped_perc = 0 - ontarget_perc = 0 - concatermer_perc = 0 - short_perc = 0 + n_long = 0 with pysam.AlignmentFile(bam_file, "rb") as bam: region = parse_bed(bed_regions) @@ -204,8 +207,8 @@ def filter_reads(args): continue if read.is_secondary: + n_secondary += 1 if not incl_sec: - n_secondary += 1 write_read(read, output, region, "secondary", out_format) continue @@ -215,17 +218,21 @@ def filter_reads(args): continue n_ontarget += 1 - if read.query_alignment_length < ( - read.query_length - 2 * max_clipping - ): + if read.query_alignment_length < (read.query_length - 2 * adapter_length): n_concatamer += 1 write_read(read, output, region, "concatamer", out_format) continue - if read.reference_length < (region_length * min_overlap): + if read.query_alignment_length < (region_length * min_overlap): n_short += 1 write_read(read, output, region, "short", out_format) continue + + if read.query_length > (region_length * ( 2 - min_overlap) + 2 * adapter_length): + n_long += 1 + write_read(read, output, region, "long", out_format) + continue + n_reads_region += 1 write_read(read, output, region, "filtered", out_format) @@ -233,15 +240,15 @@ def filter_reads(args): stats_out_filename = os.path.join( output, "{}.tsv".format(stats_out_filename)) write_tsv(n_total, n_unmapped, n_secondary, n_supplementary, n_ontarget, - n_concatamer, n_short, n_reads_region, incl_sec, stats_out_filename, region) + n_concatamer, n_short, n_long, n_reads_region, incl_sec, stats_out_filename, region) -def write_tsv(n_total, n_unmapped, n_secondary, n_supplementary, n_ontarget, n_concatamer, n_short, n_reads_region, incl_sec, stats_out_filename, region): +def write_tsv(n_total, n_unmapped, n_secondary, n_supplementary, n_ontarget, n_concatamer, n_short, n_long, n_reads_region, incl_sec, stats_out_filename, region): if n_total > 0: if incl_sec: - filtered_perc = 100 * n_supplementary // n_total + filtered_perc = 100 * n_reads_region // n_total else: - filtered_perc = 100 * (n_secondary + n_supplementary) // n_total + filtered_perc = 100 * (n_secondary + n_reads_region) // n_total unmapped_perc = 100 * n_unmapped // n_total secondary_perc = 100 * n_secondary // n_total @@ -251,6 +258,7 @@ def write_tsv(n_total, n_unmapped, n_secondary, n_supplementary, n_ontarget, n_c if ontarget_perc > 0: concatermer_perc = 100 * n_concatamer // n_ontarget short_perc = 100 * n_short // n_ontarget + long_perc = 100 * n_long // n_ontarget with open(stats_out_filename, "a") as out_f: print( @@ -263,6 +271,7 @@ def write_tsv(n_total, n_unmapped, n_secondary, n_supplementary, n_ontarget, n_c "reads_on_target", "reads_concatamer", "reads_short", + "reads_long", "reads_filtered", "include_secondary", sep="\t", @@ -278,6 +287,7 @@ def write_tsv(n_total, n_unmapped, n_secondary, n_supplementary, n_ontarget, n_c n_ontarget, n_concatamer, n_short, + n_long, n_reads_region, incl_sec, sep="\t", @@ -293,6 +303,7 @@ def write_tsv(n_total, n_unmapped, n_secondary, n_supplementary, n_ontarget, n_c ontarget_perc, concatermer_perc, short_perc, + long_perc, filtered_perc, incl_sec, sep="\t", diff --git a/bin/parse_clusters.py b/bin/parse_clusters.py index 059a290..195a71b 100644 --- a/bin/parse_clusters.py +++ b/bin/parse_clusters.py @@ -2,8 +2,12 @@ import logging import os import sys +import time + +import threading as t import pysam +import edlib def parse_args(argv): @@ -51,7 +55,6 @@ def parse_args(argv): type=str, default="random", help="[ random | quality ] Choose strategy to remove reads. Only if --balance_strands is set" - # TODO Check vsearch: Reads might be sorted by length, so filtering would not be random! ) parser.add_argument( @@ -71,19 +74,23 @@ def parse_args(argv): ) parser.add_argument( - "-o", "--output", dest="OUTPUT", required=True, help="Output folder" + "--max_dist_umi", + dest="MAX_EDIT_DIST", + type=int, + default=2, + help="Max distance of UMIs per cluster. Used to split cluster into subclusters", ) - + parser.add_argument( - "--tsv", dest="TSV", action="store_true", help="write TSV output file" + "-o", "--output", dest="OUTPUT", required=True, help="Output folder" ) parser.add_argument( - "--vsearch_consensus", dest="VSEARCH_CONSENSUS", required=True, type=str, help="VSearch consensus FASTX" + "--tsv", dest="TSV", action="store_true", help="write TSV output file" ) parser.add_argument( - "--vsearch_folder", dest="VSEARCH_FOLDER", required=True, type=str, help="VSearch cluster folder" + "--clusters", dest="CLUSTERS", nargs= "+", required=True, type=str, help="cluster fastxs" ) parser.add_argument( @@ -100,44 +107,66 @@ def parse_args(argv): def get_cluster_id(cluster): - return cluster.name.split(";")[-1].split("=")[1] + return cluster.split("cluster")[-1] -def get_cluster_seq_n(cluster): - return int(cluster.name.split(";")[-2].split("=")[1]) +def get_read_seq(read): + return read.name.split(";seq=")[1].split(";")[0] -def get_read_seq(entry): - return entry.name.split(";seq=")[1].split(";")[0] +def get_read_qual(read): + return read.name.split(";qual=")[1] -def get_read_qual(entry): - return entry.name.split(";qual=")[1] +def get_read_strand(read): + return read.name.split(";strand=")[1].split(";")[0] -def get_read_strand(entry): - return entry.name.split("strand=")[1].split(";")[0] +def get_read_name(read): + return read.name.split(";")[0] -def get_read_name(entry): - return entry.name.split(";")[0] - -def get_read_mean_qual(entry): - qual = get_read_qual(entry) +def get_read_mean_qual(read): + qual = get_read_qual(read) return get_mean_qual(qual) -def get_read_qual(entry): - return entry.name.split("qual=")[1].split(";seqs=")[0] - def get_mean_qual(qual): return sum(map(lambda char: ord(char), qual)) / len(qual) -def get_split_reads(cluster_fasta_umi): +def get_reads(cluster): + residual_reads = [] + n_residual_reads = 0 + with pysam.FastxFile(cluster) as reads: + for read in reads: + residual_reads.append(read) + n_residual_reads += 1 + return residual_reads, n_residual_reads + +def get_split_cluster(reads, max_edit_dist): + subcluster = [] + distant_reads = [] + parent = reads[0].sequence + + for read in reads: + # calculate edit distance between parent and all other reads in the cluster + result = edlib.align( + parent, + read.sequence, + mode="HW", + k=max_edit_dist + ) + if result["editDistance"] == -1: + distant_reads.append(read) + else: + subcluster.append(read) + return subcluster, distant_reads + + +def get_split_reads(reads): reads_fwd = [] reads_rev = [] - with pysam.FastxFile(cluster_fasta_umi) as fh: - for entry in fh: - strand = get_read_strand(entry) - - if strand == "+": - reads_fwd.append(entry) - else: - reads_rev.append(entry) + + for read in reads: + strand = get_read_strand(read) + if strand == "+": + reads_fwd.append(read) + else: + reads_rev.append(read) return reads_fwd, reads_rev def get_sorted_reads(reads): @@ -184,119 +213,35 @@ def get_filtered_reads(reads_fwd, reads_rev, reads_found, n_fwd, n_rev, min_read return reads_fwd, reads_rev, write_cluster, skipped_fwd, skipped_rev -def polish_cluster( - cluster_id, - vsearch_folder, - output_folder, - min_reads, - max_reads, - filter, - stats_out_filename, - tsv, - balance_strands, - smolecule_out, - format -): - reads_found = 0 - reads_written = 0 - reads_written_fwd = 0 - reads_written_rev = 0 - - reads_skipped = 0 - cluster_written = 0 - - reads_fwd = [] - reads_rev = [] - - cluster_fasta_umi = os.path.join( - vsearch_folder, "cluster{}".format(cluster_id)) - parsed_cluster = os.path.join( - output_folder, "cluster{}.{}".format(cluster_id, format)) - smolecule_file = os.path.join( - output_folder, "smolecule{}.{}".format(cluster_id, format)) - - parse_cluster(cluster_fasta_umi, parsed_cluster, format) - - reads_fwd, reads_rev = get_split_reads(cluster_fasta_umi) - n_fwd = len(reads_fwd) - n_rev = len(reads_rev) - reads_found = n_fwd + n_rev - - # Fail fast if no stat file must be written - if not tsv: - if reads_found < min_reads: - cluster_written = 0 - reads_written = 0 - reads_skipped = reads_found - return cluster_written, reads_found, reads_skipped, reads_written - else: - if filter == "quality": - reads_fwd = get_sorted_reads(reads_fwd) - reads_rev = get_sorted_reads(reads_rev) - - #logging.info( isinstance(reads_fwd, list), isinstance(reads_rev, list), cluster_id) - reads_fwd, reads_rev, write_cluster, reads_skipped_fwd, reads_skipped_rev = get_filtered_reads( - reads_fwd, reads_rev, reads_found, n_fwd, n_rev, min_reads, max_reads, balance_strands) - - if write_cluster: - cluster_written = 1 - reads_written_fwd = len(reads_fwd) - reads_written_rev = len(reads_rev) - - reads = reads_fwd + reads_rev - write_smolecule(cluster_id, reads, smolecule_file, format) - else: - cluster_written = 0 - - if tsv: - write_tsv_line(stats_out_filename, cluster_id, cluster_written, reads_found, n_fwd, - n_rev, reads_written_fwd, reads_written_rev, reads_skipped_fwd, reads_skipped_rev) - - return ( - cluster_written, - reads_found, - reads_skipped_fwd + reads_skipped_rev, - reads_written_rev + reads_written_fwd - ) - - -def parse_cluster(cluster_fasta_umi, parsed_cluster, format): - with open(parsed_cluster, "w") as out_f: - with pysam.FastxFile(cluster_fasta_umi) as fh: - for entry in fh: - read_name = get_read_name(entry) - seq = get_read_seq(entry) - if format == "fastq": - qual = get_read_qual(entry) - write_fastq_entry(read_name, seq, qual, out_f) - else: - write_fasta_entry(read_name, seq, out_f) - - def write_smolecule(cluster_id, reads, smolecule_file, format): with open(smolecule_file, "w") as out_f: - for n, entry in enumerate(reads): - seq = get_read_seq(entry) + for n, read in enumerate(reads): + seq = get_read_seq(read) read_name = "{}_{}".format(cluster_id, n) if format == "fastq": - qual = get_read_qual(entry) - write_fastq_entry(read_name, seq, qual, out_f) + qual = get_read_qual(read) + write_fastq_read(read_name, seq, qual, out_f) else: - write_fasta_entry(read_name, seq, out_f) + write_fasta_read(read_name, seq, out_f) -def write_fastq_entry(read_name, read_seq, qual, out_f): +def write_fastq_read(read_name, read_seq, qual, out_f): print("@{}".format(read_name), file=out_f) print("{}".format(read_seq), file=out_f) print("+", file=out_f) print("{}".format(qual), file=out_f) -def write_fasta_entry(read_name, read_seq, out_f): +def write_fasta_read(read_name, read_seq, out_f): print(">{}".format(read_name), file=out_f) print("{}".format(read_seq), file=out_f) - +def write_subcluster(subcluster, subcluster_name): + with open(subcluster_name, "w") as out_f: + for read in subcluster: + print(">{}".format(read.name), file=out_f) + print("{}".format(read.sequence), file=out_f) + def write_tsv_line(stats_out_filename, cluster_id, cluster_written, reads_found, n_fwd, n_rev, reads_written_fwd, reads_written_rev, reads_skipped_fwd, reads_skipped_rev): with open(stats_out_filename, "a") as out_f: print(cluster_id, @@ -312,81 +257,83 @@ def write_tsv_line(stats_out_filename, cluster_id, cluster_written, reads_found, file=out_f, ) -def parse_clusters(args): - cluster_filename = args.VSEARCH_CONSENSUS - min_read_per_cluster = args.MIN_CLUSTER_READS - max_read_per_cluster = args.MAX_CLUSTER_READS +def parse_cluster(min_reads, max_reads, filter, format, cluster, output_folder, balance_strands, tsv, max_edit_dist, stats_out_filename): + residual_reads, n_residual_reads = get_reads(cluster) + n_subcluster = 0 + cluster_id = get_cluster_id(cluster) + + while n_residual_reads >= min_reads: + reads_found = 0 + reads_found = 0 + reads_written_fwd = 0 + reads_written_rev = 0 + cluster_written = 0 + + cluster_id_subcluster = "{}sub{}".format(cluster_id, n_subcluster) + smolecule_file = os.path.join( + output_folder, "smolecule{}.{}".format(cluster_id_subcluster, format)) + + subcluster, residual_reads = get_split_cluster(residual_reads, max_edit_dist) + n_residual_reads = len(residual_reads) + write_subcluster( + subcluster, + os.path.join(output_folder, "{}_{}".format(cluster, n_subcluster)) + ) + n_subcluster += 1 + + reads_fwd, reads_rev = get_split_reads(subcluster) + n_fwd = len(reads_fwd) + n_rev = len(reads_rev) + reads_found = n_fwd + n_rev + + if filter == "quality": + reads_fwd = get_sorted_reads(reads_fwd) + reads_rev = get_sorted_reads(reads_rev) + + reads_fwd, reads_rev, write_cluster, reads_skipped_fwd, reads_skipped_rev = get_filtered_reads( + reads_fwd, reads_rev, reads_found, n_fwd, n_rev, min_reads, max_reads, balance_strands) + + if write_cluster: + cluster_written = 1 + reads_written_fwd = len(reads_fwd) + reads_written_rev = len(reads_rev) + + reads = reads_fwd + reads_rev + write_smolecule(cluster_id_subcluster, reads, smolecule_file, format) + else: + cluster_written = 0 + + if tsv: + write_tsv_line(stats_out_filename, cluster_id_subcluster, cluster_written, reads_found, n_fwd, + n_rev, reads_written_fwd, reads_written_rev, reads_skipped_fwd, reads_skipped_rev) + +def parse_cluster_wrapper(args): + min_reads = args.MIN_CLUSTER_READS + max_reads = args.MAX_CLUSTER_READS filter = args.FILTER format = args.OUT_FORMAT - vsearch_folder = args.VSEARCH_FOLDER - output = args.OUTPUT + clusters = args.CLUSTERS + output_folder = args.OUTPUT balance_strands = args.BAL_STRANDS tsv = args.TSV - - smolecule_filename = "smolecule_clusters" - stats_out_filename = "vsearch_cluster_stats" - n_clusters = 0 - n_written = 0 - reads_found = 0 - reads_skipped = 0 - reads_written = 0 - + max_edit_dist = args.MAX_EDIT_DIST + + stats_out_filename = "split_cluster_stats" + if tsv: stats_out_filename = os.path.join( - output, "{}.tsv".format(stats_out_filename)) + output_folder, "{}.tsv".format(stats_out_filename)) write_tsv_line(stats_out_filename, "cluster_id", "cluster_written", "reads_found", "reads_found_fwd", - "reads_found_rev", "reads_written_fwd", "reads_written_rev", "reads_skipped_fwd", "reads_skipped_rev") - - with pysam.FastxFile(cluster_filename) as fh: - for cluster in fh: - - # cons_umi = None cluster.sequence.replace("T", "") - cluster_id = get_cluster_id(cluster) - - a, b, c, d = polish_cluster( - cluster_id, - vsearch_folder, - output, - min_read_per_cluster, - max_read_per_cluster, - filter, - stats_out_filename, - tsv, - balance_strands, - smolecule_filename, - format - ) - n_clusters += 1 - n_written += a - reads_found += b - reads_skipped += c - reads_written += d - - if n_written == 0 or reads_found == 0: - raise RuntimeError( - "ERROR - did not find a single cluster passing the min_read threashold!" - ) - - logging.info( - "Clusters: {}% written ({})".format( - n_written * 100.0 / n_clusters, n_written - ) - ) - logging.info( - "Reads: {} found".format( - reads_found - ) - ) - - logging.info( - "Reads: {} removed ({}%)".format( - reads_skipped, reads_skipped * 100.0 // reads_found - ) - ) - logging.info("Reads: {} written ({}%)".format( - reads_written, reads_written * 100.0 // reads_found)) + "reads_found_rev", "reads_written_fwd", "reads_written_rev", "reads_skipped_fwd", "reads_skipped_rev") + for cluster in clusters: + # parse_cluster_thread = t.Thread(target=parse_cluster, args=( + # min_reads, max_reads, filter, format, cluster, output_folder, balance_strands, tsv, max_edit_dist, stats_out_filename + # )) + # parse_cluster_thread.start() + parse_cluster(min_reads, max_reads, filter, format, cluster, output_folder, balance_strands, tsv, max_edit_dist, stats_out_filename) + def main(argv=sys.argv[1:]): """ Basic command line interface to telemap. @@ -404,7 +351,7 @@ def main(argv=sys.argv[1:]): logging.basicConfig(level=numeric_level, format="%(message)s") try: - parse_clusters(args) + parse_cluster_wrapper(args) except RuntimeError as e: logging.error(e) diff --git a/bin/reformat_consensus.py b/bin/reformat_consensus.py index e00b0e9..8bb8b71 100755 --- a/bin/reformat_consensus.py +++ b/bin/reformat_consensus.py @@ -1,6 +1,7 @@ import argparse import logging import sys +import os import pysam @@ -36,6 +37,14 @@ def parse_args(argv): help="Print debug information", ) + parser.add_argument( + "--consensus_fasta", dest="CONS_FASTA", required=True, type=str, help="consensus fasta file" + ) + + parser.add_argument( + "-o", "--output", dest="OUTPUT", required=True, help="Output folder" + ) + parser.add_argument( "-t", "--threads", dest="THREADS", type=int, default=1, help="Number of threads." ) @@ -44,19 +53,29 @@ def parse_args(argv): return args +def get_read_seq(read): + return read.name.split(";seq=")[1].split(";")[0] + +def get_read_qual(read): + return read.name.split(";qual=")[1].split(";seqs=")[0] + +def get_read_name(read): + return read.name.split(";")[0] def parse_stdin(args): - cluster_filename = "/dev/stdout" - consensus_filename = "/dev/stdin" - - with open(cluster_filename, "w") as out: - with pysam.FastxFile(consensus_filename) as fh: - for entry in fh: - cols = entry.name.split(";") - read_id = cols[0] - read_seq = cols[6].split("=")[1] - print(">{}".format(read_id), file=out) - print(read_seq, file=out) + consensus_filename = args.CONS_FASTA + output_folder = args.OUTPUT + cluster_filename = os.path.join(output_folder, "final.fastq") + + with open(cluster_filename, "w") as out, pysam.FastxFile(consensus_filename) as reads: + for read in reads: + read_name = get_read_name(read) + read_seq = get_read_seq(read) + read_qual = get_read_qual(read) + print("@{}".format(read_name), file=out) + print(read_seq, file=out) + print("+", file=out) + print(read_qual, file=out) def main(argv=sys.argv[1:]): diff --git a/bin/setup.py b/bin/setup.py deleted file mode 100755 index a0501ba..0000000 --- a/bin/setup.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -UMI amplicion tools setup script - -Copyright (c) 2020 by Oxford Nanopore Technologies Ltd. -""" -import os -from setuptools import setup - -__version__ = '1.0.0' - -setup( - name='umi-amplicon-tools', - version=__version__, - author='Nanopore Technologies Ltd.', - description='Toolset to work with ONT amplicon sequencing using UMIs', - zip_safe=False, - install_requires=[ - 'pysam', - 'numpy', - 'pandas', - 'seaborn', - 'edlib' - ], - packages=['umi_amplicon_tools'], - package_data={ - 'umi_amplicon_tools': ['data/*'], - }, - entry_points={ - "console_scripts": [ - 'umi_filter_reads = umi_amplicon_tools.filter_reads:main', - 'umi_extract = umi_amplicon_tools.extract_umis:main', - 'umi_reformat_consensus = umi_amplicon_tools.reformat_consensus:main', - 'umi_parse_clusters = umi_amplicon_tools.parse_clusters:main', - 'umi_stats = umi_amplicon_tools.umi_stats:main' - ] - }, -) - diff --git a/config/base.config b/config/base.config index e1e2307..6da52d0 100644 --- a/config/base.config +++ b/config/base.config @@ -3,30 +3,11 @@ // PROCESS RESOURCES process { - withLabel: "MemoryDemandingTaks" { - errorStrategy = 'retry' - memory = { 20.GB * task.attempt } - } - // top-level configuration labels for groups of processes - withLabel: "low" { - time = { 2.h * task.attempt } - memory = { 2.GB * task.attempt } - cpus = { 2 * task.attempt } - } - - withLabel: "high" { - time = { 16.h * task.attempt } - memory = { 24.GB * task.attempt } - cpus = { 8 * task.attempt } + withName: "POLISH_CLUSTER" { + memory = { 10.GB * task.attempt } + cpus = 2 } - // label processes which should kill the pipeline if they fail - withLabel: "finish" { - errorStrategy = { task.exitStatus in [140,143,137,104,134,139] ? 'retry' : 'finish' } - } - - // label processes which can be safely ignored if they fail - withLabel: "ignore" { - errorStrategy = { task.exitStatus in [140,143,137,104,134,139] ? 'retry' : 'ignore' } - } + errorStrategy = 'retry' + maxRetries = 3 } diff --git a/config/test.config b/config/test.config index ccfdaa9..281bf74 100644 --- a/config/test.config +++ b/config/test.config @@ -20,7 +20,7 @@ params { subsampling = false - min_reads_per_cluster = 18 + min_reads_per_cluster = 10 max_reads_per_cluster = 20 write_reports = true @@ -37,17 +37,21 @@ if(params.output != null){ dag { enabled = true file = "${params.output}/nextflow_stats/dag.mmd" + overwrite = true } report { enabled = true file = "${params.output}/nextflow_stats/report.html" + overwrite = true } timeline { enabled = true file = "${params.output}/nextflow_stats/timeline.html" + overwrite = true } trace { enabled = true file = "${params.output}/nextflow_stats/trace.txt" + overwrite = true } } diff --git a/data/info.txt b/data/info.txt index 60fdab9..a668052 100644 --- a/data/info.txt +++ b/data/info.txt @@ -1,7 +1,22 @@ -This folder contains testdata to run the pipeline from a proof-of-principle run -(run9, barcode04 [=barcode01] and barcode07 [=barcode02]) +This folder contains test data to run the pipeline from a proof-of-principle run. +The test data originated from mixtures of two plasmids containing different sequences (KIV-2A or KIV-2B). -The test data originated from mixtures of two plasmids containing different sequences. -Barcode01 has a mixture level of: -Barcode02 has a mixture level of: +Barcode02: +Raw reads: 10,336 +Found clusters: 1,383 +Clusters equal or above the minimal cluster size (18): 30 +Clusters equal or above the minimal cluster size (18) after read balancing: 14 +Consensus sequences before polishing: 14 +Final consensus sequences after polishing: 13 +1 Sequence KIV-2B can be found in both (consensus and final) bam files. + +Barcode03: +Barcode03 has a mixture level of 33% KIV-2A and 67% KIV-2B. +Raw reads: 6,509 +Found clusters: 240 +Clusters equal or above the minimal cluster size (18): 104 +Clusters equal or above the minimal cluster size (18) after read balancing: 14 +Consensus sequences before polishing: 91 +Final consensus sequences after polishing: 76 +51 Sequence KIV-2B can be found in both (consensus and final) bam files. diff --git a/docs/images/Flowchart.png b/docs/images/Flowchart.png new file mode 100644 index 0000000..4e3844f Binary files /dev/null and b/docs/images/Flowchart.png differ diff --git a/docs/images/Flowchart_UMI_nf.jpg b/docs/images/Flowchart_UMI_nf.jpg deleted file mode 100644 index 069823d..0000000 Binary files a/docs/images/Flowchart_UMI_nf.jpg and /dev/null differ diff --git a/docs/images/Umi_pipeline_without_output_Flowchart.png b/docs/images/Umi_pipeline_without_output_Flowchart.png deleted file mode 100644 index 570f1ec..0000000 Binary files a/docs/images/Umi_pipeline_without_output_Flowchart.png and /dev/null differ diff --git a/docs/images/umi-pipeline-nf_metro-map.svg b/docs/images/umi-pipeline-nf_metro-map.svg new file mode 100644 index 0000000..bbf4b8b --- /dev/null +++ b/docs/images/umi-pipeline-nf_metro-map.svg @@ -0,0 +1,4 @@ + + + +
tsv
tsv
Fastq
Fastq
Fastq
Fastq
Fastq
Fastq
Bed
Bed
Fasta
Fasta
Subsample
reads
Subsample...
Filter 
Fastq
Filter...
Map
reads
Map...
Reference
Reference
Reference
coordinates
Reference...
Raw reads
Raw reads
Split
reads
Split...
Extract 
UMIs
Extract...
Cluster UMIs
Cluster UMIs
Parse 
cluster
Parse...
Polish 
cluster
Polish...
Map 
consensus
sequences
Map...
Variant 
calling
Variant...
vcf
vcf
vcf
vcf
bam
bam
bam
bam
tsv
tsv
tsv
tsv
Variant
calling
Variant...
Consensus
sequences
Consensus...
Log
files
Log...
Consensus sequences
Consensus sequences
Re-polished consensus sequences
Re-polished consensus sequences
Mutserve
Mutserve
Lofreq
Lofreq
Freebayes
Freebayes
\ No newline at end of file diff --git a/docs/output.md b/docs/output.md index c9d4a12..597e48f 100644 --- a/docs/output.md +++ b/docs/output.md @@ -4,7 +4,7 @@ This document describes the output produced by the pipeline. ## Pipeline overview The pipeline is built using [Nextflow](https://www.nextflow.io/) and processes data using the following steps: -![Workflow](images/Flowchart_UMI_nf.jpg) +![Workflow](images/Flowchart.png) * [Merge and Filter Reads](#merge-and-filter-reads) - Merge and filter input fastq files * [Subsample Reads](#subsampling) - Subsample merged and filtered reads diff --git a/env/Dockerfile b/env/Dockerfile index 01dfba7..3af161b 100644 --- a/env/Dockerfile +++ b/env/Dockerfile @@ -1,27 +1,23 @@ -FROM ubuntu:18.04 +FROM ubuntu:22.04 + LABEL authors="Amstler Stephan" \ email="amstler.stephan@i-med.ac.at" -ENV PATH="/root/miniconda3/bin:${PATH}" -ARG PATH="/root/miniconda3/bin:${PATH}" -RUN apt-get update -RUN apt-get install -y wget && rm -rf /var/lib/apt/lists/* +COPY environment.yml . + +RUN apt-get update && \ + apt-get install -y wget && \ + rm -rf /var/lib/apt/lists/* -RUN wget \ - https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh \ - && mkdir /root/.conda \ - && bash Miniconda3-latest-Linux-x86_64.sh -b \ - && rm -f Miniconda3-latest-Linux-x86_64.sh -RUN conda --version -RUN conda update -y conda +RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-py38_23.9.0-0-Linux-x86_64.sh -O ~/miniconda.sh && \ + /bin/bash ~/miniconda.sh -b -p /opt/conda +ENV PATH=/opt/conda/bin:${PATH} -COPY environment.yml . -RUN \ - conda env update -n root -f environment.yml \ -&& conda clean -a +RUN conda update -y conda && \ + conda env update -n root -f environment.yml && \ + conda clean --all -RUN mkdir /opt/mutserve -WORKDIR "/opt/mutserve" +WORKDIR "/opt" RUN wget https://github.com/seppinho/mutserve/releases/download/v2.0.0-rc15/mutserve.zip && \ unzip mutserve.zip ENV PATH="/opt/mutserve:${PATH}" diff --git a/env/environment.yml b/env/environment.yml index 44dddbe..bec01d1 100644 --- a/env/environment.yml +++ b/env/environment.yml @@ -5,6 +5,7 @@ channels: - defaults dependencies: - python>=3.8 + - medaka>=1.8.0 - minimap2=2.24 - samtools=1.15.1 - seqtk=1.3 @@ -13,7 +14,6 @@ dependencies: - vcflib - bedtools=2.30.0 - vsearch=2.21.2 - - medaka=1.7.0 - openjdk=11.0.9 - unzip=6.0 - pip=22.2.2 @@ -21,6 +21,6 @@ dependencies: - catfishq==1.4.0 - numpy==1.23.4 - pysam==0.19.1 + - pyfastx==1.1.0 - pandas==1.5.0 - edlib==1.3.9 - diff --git a/lib/processes/cluster.nf b/lib/processes/cluster.nf index 683e80e..724efdf 100644 --- a/lib/processes/cluster.nf +++ b/lib/processes/cluster.nf @@ -1,29 +1,27 @@ -centroid_fasta="clusters_centroid.fasta" -consensus_fasta="clusters_consensus.fasta" +consensus_fasta="consensus.fasta" vsearch_dir="vsearch_clusters" process CLUSTER { - publishDir "${params.output}/${sample}/clustering/${type}", mode: 'copy' - + publishDir "${params.output}/${sample}/clustering/${type}", pattern: "${consensus_fasta}", mode: 'copy' + input: - tuple val( sample ), val( target ), path( detected_umis_fasta ) + tuple val( sample ), val( target ), path( detected_umis_fastq ) val ( type ) output: tuple val( "${sample}" ), val( "${target}" ), path( "${consensus_fasta}" ), emit:consensus_fasta - path( "${vsearch_dir}" ), emit: vsearch_dir - path( "${centroid_fasta}" ) + tuple val( "${sample}" ), val( "${target}" ), path( "cluster*" ), emit:cluster_fastas + script: + def id = "${type}" == "raw" ? 0.8 : 0.99 """ - mkdir -p ${vsearch_dir} && \ vsearch \ --clusterout_id \ - --clusters ${vsearch_dir}/cluster \ - --centroids ${centroid_fasta} \ + --clusters cluster \ --consout ${consensus_fasta} \ --minseqlength ${params.min_length} \ --maxseqlength ${params.max_length} \ --threads ${params.threads} \ - --cluster_fast ${detected_umis_fasta} \ + --cluster_fast ${detected_umis_fastq} \ --clusterout_sort \ --gapopen 0E/5I \ --gapext 0E/2I \ @@ -32,6 +30,6 @@ process CLUSTER { --iddef 0 \ --minwordmatches 0 \ --qmask none \ - --id 0.85 + --id $id """ -} \ No newline at end of file +} diff --git a/lib/processes/detect_umi_consensus_fasta.nf b/lib/processes/detect_umi_consensus_fasta.nf deleted file mode 100644 index 05e0dd4..0000000 --- a/lib/processes/detect_umi_consensus_fasta.nf +++ /dev/null @@ -1,25 +0,0 @@ -process DETECT_UMI_CONSENSUS_FASTA { - publishDir "${params.output}/${sample}/stats/${type}", pattern: "*.tsv", mode: 'copy' - publishDir "${params.output}/${sample}/fasta_umi/${type}", pattern: "*fasta", mode: 'copy' - - input: - tuple val( sample ), val( target ), path ( fasta ) - val ( type ) - path umi_extract_python - - output: - tuple val( "${sample}" ), val( "${fasta.baseName}" ), path ( "*fasta" ), emit: umi_extract_fasta - path "*.tsv" - - script: - def write_report = "${params.write_reports}" ? "--tsv" : "" - """ - python ${umi_extract_python} \ - --fwd-umi ${params.fwd_umi} \ - --rev-umi ${params.rev_umi} \ - --max-error ${params.umi_errors} \ - $write_report \ - -o . \ - ${fasta} - """ -} \ No newline at end of file diff --git a/lib/processes/detect_umi_fasta.nf b/lib/processes/detect_umi_fastq.nf similarity index 62% rename from lib/processes/detect_umi_fasta.nf rename to lib/processes/detect_umi_fastq.nf index 63ffc0f..0b46386 100644 --- a/lib/processes/detect_umi_fasta.nf +++ b/lib/processes/detect_umi_fastq.nf @@ -1,25 +1,26 @@ -process DETECT_UMI_FASTA { +process DETECT_UMI_FASTQ { publishDir "${params.output}/${sample}/stats/${type}", pattern: "*.tsv", mode: 'copy' publishDir "${params.output}/${sample}/${params.output_format}_umi/${type}", pattern: "*${params.output_format}", mode: 'copy' input: - tuple val( sample ), val( target ), path ( fasta ) + tuple val( sample ), val( target ), path ( fastq ) val ( type ) path umi_extract_python output: - tuple val( "${sample}" ), val( "${fasta.baseName}" ), path ( "*${params.output_format}" ), emit: umi_extract_fasta + tuple val( "${sample}" ), val( "${fastq.baseName}" ), path ( "*${params.output_format}" ), emit: umi_extract_fastq path "*.tsv" script: - def write_report = "${params.write_reports}" ? "--tsv" : "" + def write_report = params.write_reports ? "--tsv" : "" """ python ${umi_extract_python} \ --fwd-umi ${params.fwd_umi} \ --rev-umi ${params.rev_umi} \ --max-error ${params.umi_errors} \ + --adapter_length ${params.adapter_length} \ --output_format ${params.output_format} \ $write_report \ - -o . ${fasta} + -o . ${fastq} """ } \ No newline at end of file diff --git a/lib/processes/filter_consensus_fastq.nf b/lib/processes/filter_consensus_fastq.nf new file mode 100644 index 0000000..8992ca4 --- /dev/null +++ b/lib/processes/filter_consensus_fastq.nf @@ -0,0 +1,26 @@ +filtered_fastq="masked_consensus.fastq" +process FILTER_CONSENSUS_FASTQ { + publishDir "${params.output}/${sample}/fastq/${type}", mode: 'copy' + + input: + tuple val( sample ), val( target ), path( merged_consensus_fastq ) + val ( type ) + + when: + params.output_format == "fastq" + + output: + tuple val( "${sample}" ), val( "${target}" ), path( "${filtered_fastq}" ), emit: filtered_consensus_fastq + + script: + def masking_strategy = params.masking_strategy == "hardmask" ? "--hardmask" : "" + """ + vsearch \ + --fastx_mask ${merged_consensus_fastq} \ + --fastq_qmax 90 \ + --fastq_qmin ${params.min_consensus_quality} \ + $masking_strategy \ + --fastqout ${filtered_fastq} + """ + // fastq_qmax hardcoded to 90 (maximal possible Q-score value) to have no upper limit of quality +} \ No newline at end of file diff --git a/lib/processes/map_reads.nf b/lib/processes/map_reads.nf index 36e68ec..33a1c37 100644 --- a/lib/processes/map_reads.nf +++ b/lib/processes/map_reads.nf @@ -2,7 +2,7 @@ process MAP_READS { publishDir "${params.output}/${sample}/align/${type}", mode: 'copy' input: - tuple val( sample ), val( target ), path( consensus_fasta ) + tuple val( sample ), val( target ), path( consensus_fastq ) val( type ) path reference output: @@ -14,12 +14,12 @@ process MAP_READS { ${params.minimap2_param} \ -t ${params.threads} \ ${reference} \ - ${consensus_fasta} | + ${consensus_fastq} | samtools sort \ -@ ${params.threads} \ - -o ${consensus_fasta.baseName}.bam - && \ + -o ${consensus_fastq.baseName}.bam - && \ samtools index \ -@ ${params.threads} \ - ${consensus_fasta.baseName}.bam + ${consensus_fastq.baseName}.bam """ } \ No newline at end of file diff --git a/lib/processes/merge_consensus_fasta.nf b/lib/processes/merge_consensus_fasta.nf deleted file mode 100644 index 54d7c93..0000000 --- a/lib/processes/merge_consensus_fasta.nf +++ /dev/null @@ -1,14 +0,0 @@ -merged_fasta="merged_consensus.fasta" -process MERGE_CONSENSUS_FASTA { - - input: - tuple val( sample ), val( target ), path( consensus_fastas ) - - output: - tuple val( "${sample}" ), val( "${target}" ), path( "${merged_fasta}"), emit: merged_consensus_fasta - - script: - """ - cat ${consensus_fastas} > ${merged_fasta} - """ -} \ No newline at end of file diff --git a/lib/processes/merge_consensus_fastq.nf b/lib/processes/merge_consensus_fastq.nf new file mode 100644 index 0000000..17136b6 --- /dev/null +++ b/lib/processes/merge_consensus_fastq.nf @@ -0,0 +1,16 @@ +merged_fastq="merged_consensus.fastq" +process MERGE_CONSENSUS_FASTQ { + publishDir "${params.output}/${sample}/fastq/${type}", mode: 'copy' + + input: + tuple val( sample ), val( target ), path( consensus_fastqs ) + val ( type ) + + output: + tuple val( "${sample}" ), val( "${target}" ), path( "${merged_fastq}"), emit: merged_consensus_fastq + + script: + """ + cat ${consensus_fastqs} > ${merged_fastq} + """ +} \ No newline at end of file diff --git a/lib/processes/polish_cluster.nf b/lib/processes/polish_cluster.nf index 9bec1f0..0ae9ca5 100644 --- a/lib/processes/polish_cluster.nf +++ b/lib/processes/polish_cluster.nf @@ -1,23 +1,26 @@ +cpus=2 process POLISH_CLUSTER { + cpus "${cpus}" tag "${sample}" input: - tuple val( sample ), val( target ), path( smolecule_clusters_fasta ) + tuple val( sample ), val( target ), path( smolecule_clusters_fastq ) val ( type ) output: - tuple val( "${sample}" ), val( "${target}" ), path( "${smolecule_clusters_fasta.baseName}_consensus.fasta" ), emit: consensus_fasta + tuple val( "${sample}" ), val( "${target}" ), path( "${smolecule_clusters_fastq.baseName}_consensus.fastq" ), emit: consensus_fastq script: """ medaka smolecule \ - --threads ${params.threads} \ + --threads $cpus \ --length 50 \ --depth 2 \ --model ${params.medaka_model} \ --method spoa . \ - ${smolecule_clusters_fasta} 2> smolecule.log + --qualities \ + ${smolecule_clusters_fastq} 2> smolecule.log - mv consensus.fasta ${smolecule_clusters_fasta.baseName}_consensus.fasta + mv consensus.fastq ${smolecule_clusters_fastq.baseName}_consensus.fastq """ } diff --git a/lib/processes/reformat_consensus_cluster.nf b/lib/processes/reformat_consensus_cluster.nf index 58346b3..a85a002 100644 --- a/lib/processes/reformat_consensus_cluster.nf +++ b/lib/processes/reformat_consensus_cluster.nf @@ -1,5 +1,5 @@ process REFORMAT_CONSENSUS_CLUSTER { - publishDir "${params.output}/${sample}/fasta/${type}", mode: 'copy' + publishDir "${params.output}/${sample}/fastq/${type}", mode: 'copy' input: tuple val( sample ), val( target ), path( cluster_consensus_fasta ) @@ -7,10 +7,13 @@ process REFORMAT_CONSENSUS_CLUSTER { path umi_reformat_consensus output: - tuple val( "${sample}" ), val( "${target}" ), path( "${type}.fasta" ), emit:consensus_fasta + tuple val( "${sample}" ), val( "${target}" ), path( "*.fastq" ), emit:consensus_fastq script: """ - cat ${cluster_consensus_fasta} | python "${umi_reformat_consensus}" > ${type}.fasta + python ${umi_reformat_consensus} \ + --consensus_fasta ${cluster_consensus_fasta} \ + -o . + """ } \ No newline at end of file diff --git a/lib/processes/reformat_filter_cluster.nf b/lib/processes/reformat_filter_cluster.nf index 75c237d..a65cbb1 100644 --- a/lib/processes/reformat_filter_cluster.nf +++ b/lib/processes/reformat_filter_cluster.nf @@ -1,18 +1,17 @@ process REFORMAT_FILTER_CLUSTER { - publishDir "${params.output}/${sample}/stats/${type}", pattern: "*.tsv", mode: 'copy' - publishDir "${params.output}/${sample}/clustering/${type}", pattern: "smolecule*", mode: 'copy' - publishDir "${params.output}/${sample}/clustering/${type}/clusters", pattern: "cluster*.${params.output_format}", mode: 'copy' - + tag "${sample}" + // publishDir "${params.output}/${sample}/clustering/${type}/smolecule", pattern: "smolecule*", mode: 'copy' + publishDir "${params.output}/${sample}/stats/${type}", pattern: "*tsv", mode: 'copy' + input: - tuple val(sample), val(target), path(consensus_fasta) + tuple val( sample ), val( target ), path( cluster ) val( type ) - path vsearch_dir path umi_parse_clusters_python output: - tuple val( "${sample}" ), val( "${target}" ), path( "smolecule*"), emit: smolecule_clusters_fastas - path ("*${params.output_format}") - path( "*.tsv" ) + tuple val( "${sample}" ), val( "${target}" ), path( "smolecule*"), optional: true, emit: smolecule_cluster_fastqs + tuple val( sample ), val ( target ), path( "*.tsv" ), optional: true + tuple val( "${sample}" ), val( "${target}" ), path( "cluster*"), optional: true script: def balance_strands = params.balance_strands ? "--balance_strands" : "" @@ -22,11 +21,11 @@ process REFORMAT_FILTER_CLUSTER { --filter_strategy ${params.filter_strategy_clusters} \ --min_reads_per_clusters ${params.min_reads_per_cluster} \ --max_reads_per_clusters ${params.max_reads_per_cluster} \ - --vsearch_consensus ${consensus_fasta} \ - --vsearch_folder ${vsearch_dir} \ + --max_dist_umi ${params.max_dist_umi} \ + --cluster ${cluster} \ --output_format ${params.output_format} \ $balance_strands \ $write_report \ -o . """ -} \ No newline at end of file +} diff --git a/lib/processes/split_reads.nf b/lib/processes/split_reads.nf index 5af66e0..a626440 100644 --- a/lib/processes/split_reads.nf +++ b/lib/processes/split_reads.nf @@ -14,12 +14,13 @@ process SPLIT_READS { path "*${params.output_format}" script: - def include_secondary_reads = "${params.include_secondary_reads}" ? "--include_secondary_reads" : "" - def write_report = "${params.write_reports}" ? "--tsv" : "" + def include_secondary_reads = params.include_secondary_reads ? "--include_secondary_reads" : "" + def write_report = params.write_reports ? "--tsv" : "" """ python ${python_filter_reads} \ --min_overlap ${params.min_overlap} \ --output_format ${params.output_format} \ + --adapter_length ${params.adapter_length} \ $include_secondary_reads \ $write_report \ -o . ${bed} \ diff --git a/lib/processes/variant_calling/freebayes.nf b/lib/processes/variant_calling/freebayes.nf index 7993956..ed41f44 100644 --- a/lib/processes/variant_calling/freebayes.nf +++ b/lib/processes/variant_calling/freebayes.nf @@ -1,5 +1,5 @@ process FREEBAYES { - publishDir "${params.output}/${params.variant_caller}/${sample}/", mode: 'copy' + publishDir "${params.output}/${params.variant_caller}/${sample}/${type}", mode: 'copy' input: tuple val( sample ), val( target ), path( bam ), path( bai ) @@ -12,8 +12,6 @@ process FREEBAYES { script: """ - freebayes --version - freebayes -f ${reference} ${bam} | vcfallelicprimitives -kg >${type}.vcf - + freebayes -f ${reference} -F 0.009 -p 80 ${bam} | vcfallelicprimitives -kg >${type}.vcf """ } diff --git a/lib/processes/variant_calling/lofreq.nf b/lib/processes/variant_calling/lofreq.nf index ec70ac9..49f7499 100644 --- a/lib/processes/variant_calling/lofreq.nf +++ b/lib/processes/variant_calling/lofreq.nf @@ -1,5 +1,5 @@ process LOFREQ { - publishDir "${params.output}/${params.variant_caller}/${sample}/", mode: 'copy' + publishDir "${params.output}/${params.variant_caller}/${sample}/${type}", mode: 'copy' input: tuple val( sample ), val( target ), path( bam ), path( bai ) diff --git a/lib/processes/variant_calling/mutserve.nf b/lib/processes/variant_calling/mutserve.nf index fe36994..79979a3 100644 --- a/lib/processes/variant_calling/mutserve.nf +++ b/lib/processes/variant_calling/mutserve.nf @@ -1,5 +1,5 @@ process MUTSERVE { - publishDir "${params.output}/${params.variant_caller}/${sample}/", mode: 'copy' + publishDir "${params.output}/${params.variant_caller}/${sample}/${type}", mode: 'copy' input: tuple val( sample ), val( target ), path( bam ), path( bai ) @@ -18,6 +18,7 @@ process MUTSERVE { --output ${type}.vcf \ --write-raw \ --reference ${reference} \ + --insertions \ --deletions \ --contig-name \$(awk '{ print \$1 }' ${bed}) \ ${bam} diff --git a/lib/workflows/umi-pipeline.nf b/lib/workflows/umi-pipeline.nf index d5cccb8..35bf8f5 100644 --- a/lib/workflows/umi-pipeline.nf +++ b/lib/workflows/umi-pipeline.nf @@ -25,7 +25,7 @@ umi_reformat_consensus = file( "${projectDir}/bin/reformat_consensus.py", c raw = "raw" consensus = "consensus" final_consensus = "final" -barcode_sizes = [:] +n_parsed_cluster = [:] // STAGE CHANNELS @@ -41,19 +41,20 @@ Channel.fromPath("${params.input}/*", type: 'dir') include {COPY_BED} from '../processes/copy_bed.nf' include {MERGE_FASTQ} from '../processes/merge_input.nf' -include {MERGE_CONSENSUS_FASTA} from '../processes/merge_consensus_fasta.nf' +include {MERGE_CONSENSUS_FASTQ} from '../processes/merge_consensus_fastq.nf' include {SUBSAMPLING} from '../processes/subsampling.nf' include {MAP_READS; MAP_READS as MAP_CONSENSUS; MAP_READS as MAP_FINAL_CONSENSUS} from '../processes/map_reads.nf' include {SPLIT_READS} from '../processes/split_reads.nf' -include {DETECT_UMI_FASTA} from '../processes/detect_umi_fasta.nf' +include {DETECT_UMI_FASTQ; DETECT_UMI_FASTQ as DETECT_UMI_CONSENSUS_FASTQ} from '../processes/detect_umi_fastq.nf' include {CLUSTER; CLUSTER as CLUSTER_CONSENSUS} from '../processes/cluster.nf' include {REFORMAT_FILTER_CLUSTER} from '../processes/reformat_filter_cluster.nf' include {POLISH_CLUSTER} from '../processes/polish_cluster.nf' -include {DETECT_UMI_CONSENSUS_FASTA} from '../processes/detect_umi_consensus_fasta.nf' +include {FILTER_CONSENSUS_FASTQ} from '../processes/filter_consensus_fastq.nf' include {REFORMAT_CONSENSUS_CLUSTER} from '../processes/reformat_consensus_cluster.nf' -include {LOFREQ} from '../processes/variant_calling/lofreq.nf' -include {MUTSERVE} from '../processes/variant_calling/mutserve.nf' -include {FREEBAYES} from '../processes/variant_calling/freebayes.nf' +include {LOFREQ as LOFREQ_CONSENSUS; LOFREQ as LOFREQ_FINAL_CONSENSUS} from '../processes/variant_calling/lofreq.nf' +include {MUTSERVE as MUTSERVE_CONSENSUS; MUTSERVE as MUTSERVE_FINAL_CONSENSUS} from '../processes/variant_calling/mutserve.nf' +include {FREEBAYES as FREEBAYES_CONSENSUS; FREEBAYES as FREEBAYES_FINAL_CONSENSUS} from '../processes/variant_calling/freebayes.nf' + // SUB-WORKFLOWS workflow UMI_PIPELINE { @@ -64,11 +65,9 @@ workflow UMI_PIPELINE { if( params.subsampling ){ MERGE_FASTQ( fastq_files_ch ) SUBSAMPLING( MERGE_FASTQ.out.merged_fastq ) - SUBSAMPLING.out.subsampled_fastq .set { merged_fastq } } else { MERGE_FASTQ( fastq_files_ch ) - MERGE_FASTQ.out.merged_fastq .set { merged_fastq } } @@ -78,39 +77,56 @@ workflow UMI_PIPELINE { MAP_READS( merged_filtered_fastq, raw, reference ) SPLIT_READS( MAP_READS.out.bam_consensus, COPY_BED.out.bed, raw, umi_filter_reads ) - DETECT_UMI_FASTA( SPLIT_READS.out.split_reads_fastx, raw, umi_extract ) - CLUSTER( DETECT_UMI_FASTA.out.umi_extract_fasta, raw ) - REFORMAT_FILTER_CLUSTER( CLUSTER.out.consensus_fasta, raw, CLUSTER.out.vsearch_dir, umi_parse_clusters) - - REFORMAT_FILTER_CLUSTER.out.smolecule_clusters_fastas - .map{ sample, type, fastas -> barcode_sizes.put("$sample", fastas.size)} + DETECT_UMI_FASTQ( SPLIT_READS.out.split_reads_fastx, raw, umi_extract ) + CLUSTER( DETECT_UMI_FASTQ.out.umi_extract_fastq, raw ) - REFORMAT_FILTER_CLUSTER.out.smolecule_clusters_fastas - .transpose(by: 2) - .set { flatten_smolecule_fastas } + REFORMAT_FILTER_CLUSTER( CLUSTER.out.cluster_fastas, raw, umi_parse_clusters ) + + REFORMAT_FILTER_CLUSTER.out.smolecule_cluster_fastqs + .filter{ sample, type, fastqs -> fastqs.class == ArrayList} + .set{ smolecule_cluster_fastqs_list } - POLISH_CLUSTER( flatten_smolecule_fastas, consensus ) + smolecule_cluster_fastqs_list + .map{ sample, type, fastqs -> n_parsed_cluster.put("$sample", fastqs.size)} + + smolecule_cluster_fastqs_list + .transpose( by: 2 ) + .set{ smolecule_cluster_fastqs } - POLISH_CLUSTER.out.consensus_fasta - .map{ sample, type, fasta -> tuple( groupKey(sample, barcode_sizes.get("$sample")), type, fasta) } - .groupTuple() - .set { merge_consensus } + POLISH_CLUSTER( smolecule_cluster_fastqs, consensus ) + + POLISH_CLUSTER.out.consensus_fastq + .map{ sample, type, fastq -> tuple( groupKey(sample, n_parsed_cluster.get("$sample")), type, fastq) } + .groupTuple( ) + .set{ merge_consensus } - MERGE_CONSENSUS_FASTA(merge_consensus) - MAP_CONSENSUS( MERGE_CONSENSUS_FASTA.out.merged_consensus_fasta, consensus, reference ) - DETECT_UMI_CONSENSUS_FASTA( MERGE_CONSENSUS_FASTA.out.merged_consensus_fasta, consensus, umi_extract ) - CLUSTER_CONSENSUS( DETECT_UMI_CONSENSUS_FASTA.out.umi_extract_fasta , consensus ) + if ( params.output_format == "fastq"){ + MERGE_CONSENSUS_FASTQ(merge_consensus, consensus) + FILTER_CONSENSUS_FASTQ(MERGE_CONSENSUS_FASTQ.out.merged_consensus_fastq, consensus) + FILTER_CONSENSUS_FASTQ.out.filtered_consensus_fastq + .set{ consensus_fastq } + } else { + MERGE_CONSENSUS_FASTQ(merge_consensus, consensus) + .set{ consensus_fastq } + } + + MAP_CONSENSUS( consensus_fastq, consensus, reference ) + DETECT_UMI_CONSENSUS_FASTQ( consensus_fastq, consensus, umi_extract ) + CLUSTER_CONSENSUS( DETECT_UMI_CONSENSUS_FASTQ.out.umi_extract_fastq , consensus ) REFORMAT_CONSENSUS_CLUSTER( CLUSTER_CONSENSUS.out.consensus_fasta, final_consensus, umi_reformat_consensus ) - MAP_FINAL_CONSENSUS( REFORMAT_CONSENSUS_CLUSTER.out.consensus_fasta, final_consensus, reference ) + MAP_FINAL_CONSENSUS( REFORMAT_CONSENSUS_CLUSTER.out.consensus_fastq, final_consensus, reference ) if( params.call_variants ){ if( params.variant_caller == "lofreq" ){ - LOFREQ( MAP_FINAL_CONSENSUS.out.bam_consensus, final_consensus, reference, reference_fai ) + LOFREQ_CONSENSUS( MAP_CONSENSUS.out.bam_consensus, consensus, reference, reference_fai ) + LOFREQ_FINAL_CONSENSUS( MAP_FINAL_CONSENSUS.out.bam_consensus, final_consensus, reference, reference_fai ) }else if( params.variant_caller == "mutserve"){ - MUTSERVE( MAP_FINAL_CONSENSUS.out.bam_consensus, final_consensus, COPY_BED.out.bed, reference, reference_fai ) + MUTSERVE_CONSENSUS( MAP_CONSENSUS.out.bam_consensus, consensus, COPY_BED.out.bed, reference, reference_fai ) + MUTSERVE_FINAL_CONSENSUS( MAP_FINAL_CONSENSUS.out.bam_consensus, final_consensus, COPY_BED.out.bed, reference, reference_fai ) }else if( params.variant_caller == "freebayes"){ - FREEBAYES( MAP_FINAL_CONSENSUS.out.bam_consensus, final_consensus, reference, reference_fai ) + FREEBAYES_CONSENSUS( MAP_CONSENSUS.out.bam_consensus, consensus, reference, reference_fai ) + FREEBAYES_FINAL_CONSENSUS( MAP_FINAL_CONSENSUS.out.bam_consensus, final_consensus, reference, reference_fai ) }else{ exit 1, "${params.variant_caller} is not a valid option. \nPossible variant caller are " diff --git a/main.nf b/main.nf index f89294e..05fc395 100644 --- a/main.nf +++ b/main.nf @@ -11,70 +11,50 @@ if(params.help){ Usage: nextflow run AmstlerStephan/umi-pipeline-nf [OPTIONS]... - Options: GENERAL - --input [path/to/input/dir] [REQUIRED] Input directory containing (zipped) FASTQ files + Options: BASIC PARAMETERS + --help Display this help information and exit + --version Display the current pipeline version and exit + --debug Run the pipeline in debug mode + Options: GENERAL - Required + --input [path/to/input/dir] [REQUIRED] Input directory containing (zipped) FASTQ files + --output [STR] [REQUIRED] A string that can be given to name the output directory --reference [path/to/ref.fa] [REQUIRED] Path to the reference genome in fasta format - --reference_fai [path/to/fai] [REQUIRED] Path to the reference index file - --bed [path/to/data.bed] [REQUIRED] Path to the bed file - - --output [STR] [REQUIRED] A string that can be given to name the output directory - --threads Number of maximum threads to use [default: availableProcessors -1] - Options: READ FILTERING + Options: READ FILTERING --min_read_length flag to enable subsampling [default: 0] - --min_qscore Seed to produce pseudorandom numbers [default: 0] - + Options: SUBSAMPLING --subsampling flag to enable subsampling [default: false] - --subsampling_seed Seed to produce pseudorandom numbers [default: 11] - --subsampling_readnumber Number of reads after subsampling [default: 100000] - Options: VARIANT_CALLING + Options: VARIANT CALLING --call_variants flag to enable variant calling [default: false] - --variant_caller [STR] [REQUIRED if call_variants is set] Variant caller [lofreq | mutserve | freebayes ] - Options: ADVANCED - --min_reads_per_barcode Minimal number of fastq reads for each barcode [default: 100] - + Options: ADVANCED + --min_reads_per_barcode Minimal number of fastq reads for each barcode [default: 1000] --umi_errors Max differences between extracted UMIs of the read and UMI pattern [default: 3] - --min_reads_per_cluster Min number of raw reads required for a consensus read [default: 20] - --max_reads_per_cluster Max number of raw reads used for a consensus read [default: 60] - - --filter_strategy_clusters Filtering strategy for clusters with more than max_reads_per_cluster reads [random | quality] [default: random] - + --filter_strategy_clusters Filtering strategy for clusters with more than max_reads_per_cluster reads [random | quality] [default: quality] --output_format Output format until the cluster filtering step [fasta | fastq] [default: fastq] - --write_reports Write stats of cluster and cluster filtering [default: true] - --min_overlap Min overlap with target region [default: 0.90] - --balance_strands Balance forward and reverse raw reads in clusters [default: true] - --medaka_model Medaka model used to compute consensus reads [default: "r1041_e82_400bps_hac_g615"] - - --fwd_universal_primer Forward tail of primer (Ftail...UMI...primer) [default: "GTATCGTGTAGAGACTGCGTAGG"] - - --rev_universal_primer Reverse tail of primer (Rtail...UMI...primer) [default: "AGTGATCGAGTCAGTGCGAGTG"] - --fwd_umi Forward UMI (Ftail...UMI...primer) [default: "TTTVVVVTTVVVVTTVVVVTTVVVVTTT"] - --rev_umi Reverse UMI (Rtail...UMI...primer) [default: "AAABBBBAABBBBAABBBBAABBBBAAA"] - --min_length Minimum combined UMI length [default: 40] - --max_length Maximum combined UMI length [default: 60] + --minimap2_param Set the parameters for minimap2 [default: "-ax map-ont -k 13"] + --include_secondary_reads Include secondary reads in the analysis [default: false] - --minimap_param Set the parameters for minimap2 [default: "-ax map-ont -k 13"] Options: ADDITIONAL --help Display this help information and exit diff --git a/nextflow.config b/nextflow.config index da5d9c8..60f1cfe 100644 --- a/nextflow.config +++ b/nextflow.config @@ -12,111 +12,89 @@ manifest { // DEFAULT PARAMETERS params { - // BASIC PARAMS - help = false - version = false - debug = false - - // GENERAL - Required - input = null - output = null - reference = null - reference_fai = null - bed = null - - //READ FILTERING - min_read_length = 0 - - min_qscore = 0 - - // SUBSAMPLING - subsampling = false - - subsampling_seed = 11 - - subsampling_readnumber = 100000 - - // VARIANT_CALLING - call_variants = false - - variant_caller = null - - // ADVANCED - min_reads_per_barcode = 1000 - - umi_errors = 3 - - min_reads_per_cluster = 20 - - max_reads_per_cluster = 60 - - filter_strategy_clusters = "quality" - - output_format = "fastq" - - write_reports = true - - threads = (Runtime.runtime.availableProcessors() - 1) - - min_overlap = 0.90 - - include_secondary_reads = false - - balance_strands = true - - medaka_model = "r1041_e82_400bps_hac_g615" - - fwd_umi = "TTTVVVVTTVVVVTTVVVVTTVVVVTTT" - - rev_umi = "AAABBBBAABBBBAABBBBAABBBBAAA" - - min_length = 40 - - max_length = 60 - - minimap2_param = "-ax map-ont -k 13" + // BASIC PARAMS + help = false + version = false + debug = false + + // GENERAL - Required + input = null + output = null + reference = null + reference_fai = null + bed = null + + //READ FILTERING + min_read_length = 0 + min_qscore = 0 + + // SUBSAMPLING + subsampling = false + subsampling_seed = 11 + subsampling_readnumber = 100000 + + // VARIANT_CALLING + call_variants = false + variant_caller = null + + // ADVANCED + min_reads_per_barcode = 1000 + umi_errors = 3 + min_reads_per_cluster = 20 + max_reads_per_cluster = 60 + min_consensus_quality = 40 + masking_strategy = "softmask" + filter_strategy_clusters = "quality" + output_format = "fastq" + write_reports = true + min_overlap = 0.95 + include_secondary_reads = false + balance_strands = true + medaka_model = "r1041_e82_400bps_hac_g615" + fwd_umi = "TTTVVVVTTVVVVTTVVVVTTVVVVTTT" + rev_umi = "AAABBBBAABBBBAABBBBAABBBBAAA" + adapter_length = 200 + min_length = 40 + max_length = 60 + minimap2_param = "-ax map-ont -k 13 --MD" + threads = (Runtime.runtime.availableProcessors() - 1) } // NEXTFLOW PROFILES -profiles { - - // -profile standard - standard { - includeConfig "${baseDir}/config/base.config" - } - // -profile conda - conda { - includeConfig "${baseDir}/config/base.config" - process.conda = "${baseDir}/env/environment.yml" - } +// Load base.config by default for all pipelines +includeConfig "${baseDir}/config/base.config" - // -profile docker - docker { - includeConfig "${baseDir}/config/base.config" - docker.enabled = true - process.container = 'quay.io/genepi/umi-pipeline-nf:v0.1.0' +process.container = 'quay.io/genepi/umi-pipeline-nf:v0.1.0' - } - - // -profile singularity - singularity { - includeConfig "${baseDir}/config/base.config" - singularity.enabled = true - process.container = 'umi-pipeline-nf.sif' - } - - // -profile test - test { - includeConfig "${baseDir}/config/base.config" - includeConfig "${baseDir}/config/test.config" - } +profiles { - // -profile custom - custom { - includeConfig "${baseDir}/config/base.config" - includeConfig "${baseDir}/config/custom.config" - } + // -profile conda + conda { + process.conda = "${baseDir}/env/environment.yml" + } + + // -profile docker + docker { + docker.enabled = true + } + + // -profile singularity + singularity { + singularity.enabled = true + singularity.autoMounts = true + docker.enabled = false + } + + // -profile test + test { + includeConfig "${baseDir}/config/test.config" + } + + // -profile custom + custom { + includeConfig "${baseDir}/config/custom.config" + } }