From d9b433eb7b34a56da1457ffb7ac3944954d3f90a Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Fri, 6 Dec 2024 16:25:16 -0800 Subject: [PATCH] Further improve typing by replacing str with Path --- micall/drivers/sample.py | 2 +- micall/tests/test_fasta_to_csv.py | 18 ++++++------- micall/utils/contig_blaster.py | 3 ++- micall/utils/fasta_to_csv.py | 45 +++++++++++++------------------ 4 files changed, 30 insertions(+), 38 deletions(-) diff --git a/micall/drivers/sample.py b/micall/drivers/sample.py index c63c3353c..20e64fd09 100644 --- a/micall/drivers/sample.py +++ b/micall/drivers/sample.py @@ -429,7 +429,7 @@ def run_denovo(self, excluded_seeds): with open(self.unstitched_contigs_csv, 'w') as unstitched_contigs_csv, \ open(self.merged_contigs_csv, 'r') as merged_contigs_csv, \ open(self.blast_csv, 'w') as blast_csv: - fasta_to_csv(self.unstitched_contigs_fasta, + fasta_to_csv(Path(self.unstitched_contigs_fasta), unstitched_contigs_csv, merged_contigs_csv, blast_csv=blast_csv, diff --git a/micall/tests/test_fasta_to_csv.py b/micall/tests/test_fasta_to_csv.py index 0fe8a92c1..70edbf9d6 100644 --- a/micall/tests/test_fasta_to_csv.py +++ b/micall/tests/test_fasta_to_csv.py @@ -55,7 +55,7 @@ def test_genotype(tmpdir, hcv_db): 1,HCV-1a,41,1.0,100,1,41,8187,8227 """ - genotype(str(contigs_fasta), blast_csv=blast_csv) + genotype(contigs_fasta, blast_csv=blast_csv) assert expected_blast_csv == blast_csv.getvalue() @@ -75,7 +75,7 @@ def test_fasta_to_csv_two_sequences(tmpdir, hcv_db): HCV-1a,1.0,HCV-1a,CAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -98,7 +98,7 @@ def test_fasta_to_csv_two_groups(tmpdir, hcv_db): HCV-2b,1.0,HCV-2b,TGCAATGACAGCTTACAGACGGGTTTCCTCGCTTCCTTGTTTTACACCCA """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -115,7 +115,7 @@ def test_fasta_to_csv_not_found(tmpdir, hcv_db): unknown,0,,CATCACATAGGAGA """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -135,7 +135,7 @@ def test_fasta_to_csv_partial_match(tmpdir, hcv_db): HCV-1a,0.75,HCV-1a,CATCACATAGGAGACAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -156,7 +156,7 @@ def test_fasta_to_csv_reversed_match(tmpdir, hcv_db): HCV-1a,0.75,HCV-1a,CATCACATAGGAGACAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -183,7 +183,7 @@ def test_fasta_to_csv(tmpdir, hcv_db): 1,HCV-1a,41,1.0,100,1,41,8187,8227 """ - fasta_to_csv(str(contigs_fasta), contigs_csv, blast_csv=blast_csv) + fasta_to_csv(contigs_fasta, contigs_csv, blast_csv=blast_csv) assert expected_contigs_csv == contigs_csv.getvalue() assert expected_blast_csv == blast_csv.getvalue() @@ -198,7 +198,7 @@ def test_fasta_to_csv_none(tmpdir, hcv_db): ref,match,group_ref,contig """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -220,7 +220,7 @@ def test_merged_contig(tmpdir, hcv_db): """ with merged_contigs_path.open() as merged_contigs_csv: - fasta_to_csv(str(contigs_fasta), + fasta_to_csv(contigs_fasta, contigs_csv, merged_contigs_csv=merged_contigs_csv) diff --git a/micall/utils/contig_blaster.py b/micall/utils/contig_blaster.py index ccdbacd0a..20403bb9c 100644 --- a/micall/utils/contig_blaster.py +++ b/micall/utils/contig_blaster.py @@ -4,6 +4,7 @@ from itertools import groupby from operator import itemgetter from tempfile import NamedTemporaryFile +from pathlib import Path from micall.utils.fasta_to_csv import fasta_to_csv @@ -44,7 +45,7 @@ def main(): fasta_file.flush() new_contigs_csv = StringIO() blast_csv = StringIO() - fasta_to_csv(fasta_file.name, new_contigs_csv, blast_csv=blast_csv) + fasta_to_csv(Path(fasta_file.name), new_contigs_csv, blast_csv=blast_csv) blast_csv.seek(0) for source_contig_num, contig_rows in groupby(DictReader(blast_csv), itemgetter('contig_num')): diff --git a/micall/utils/fasta_to_csv.py b/micall/utils/fasta_to_csv.py index fcd762ea6..6c52465fe 100644 --- a/micall/utils/fasta_to_csv.py +++ b/micall/utils/fasta_to_csv.py @@ -62,20 +62,20 @@ def default_database() -> Iterator[Path]: def read_assembled_contigs(group_refs: Dict[str, str], genotypes: Dict[str, typing.Tuple[str, float]], - contigs_fasta_path: str) -> Iterable[GenotypedContig]: + contigs_fasta: Path) -> Iterable[GenotypedContig]: """Read assembled contigs and generate GenotypedContig objects. Args: group_refs (Dict[str, str]): Mapping of reference names to group references. genotypes (Dict[str, Tuple[str, float]]): Mapping of contig names to (reference name, match fraction). - contigs_fasta_path (str): Path to the FASTA file containing contig sequences. + contigs_fasta (Path): Path to the FASTA file containing contig sequences. Returns: Iterable[GenotypedContig]: An iterable of GenotypedContig objects. """ projects = ProjectConfig.loadDefault() - for i, record in enumerate(SeqIO.parse(contigs_fasta_path, "fasta")): + for i, record in enumerate(SeqIO.parse(contigs_fasta, "fasta")): (ref_name, match_fraction) = genotypes.get(record.name, ('unknown', 0)) seq = record.seq if match_fraction < 0: @@ -118,29 +118,29 @@ def init_contigs_refs(contigs_csv: TextIO) -> DictWriter: def write_contigs(writer: DictWriter, group_refs: Dict[str, str], genotypes: Dict[str, typing.Tuple[str, float]], - contigs_fasta_path: str): + contigs_fasta: Path): """Write contigs to a CSV file. Args: writer (DictWriter): CSV writer to write contigs. group_refs (Dict[str, str]): Mapping of reference names to group references. genotypes (Dict[str, Tuple[str, float]]): Mapping of contig names to (reference name, match fraction). - contigs_fasta_path (str): Path to the FASTA file containing contig sequences. + contigs_fasta (Path): Path to the FASTA file containing contig sequences. """ - for contig in read_assembled_contigs(group_refs, genotypes, contigs_fasta_path): + for contig in read_assembled_contigs(group_refs, genotypes, contigs_fasta): writer.writerow(dict(ref=contig.ref_name, match=contig.match_fraction, group_ref=contig.group_ref, contig=contig.seq)) -def genotype(fasta: str, db: Optional[Path] = None, +def genotype(contigs_fasta: Path, db: Optional[Path] = None, blast_csv: Optional[TextIO] = None, group_refs: Optional[Dict[str, str]] = None) -> Dict[str, typing.Tuple[str, float]]: """Use Blastn to search for the genotype of a set of reference sequences. Args: - fasta (str): File path of the FASTA file containing the query sequences. + fasta (Path): File path of the FASTA file containing the query sequences. db (Optional[str]): File path of the database to search for matches. blast_csv (Optional[TextIO]): Open file to write the blast matches to, or None. group_refs (Optional[Dict[str, str]]): Dictionary to fill with the mapping from @@ -151,25 +151,16 @@ def genotype(fasta: str, db: Optional[Path] = None, """ contig_nums: Dict[str, int] = {} # {contig_name: contig_num} - with open(fasta) as f: + with open(contigs_fasta) as f: for line in f: if line.startswith('>'): contig_name = line[1:-1] contig_nums[contig_name] = len(contig_nums) + 1 - blast_columns = ['qaccver', - 'saccver', - 'pident', - 'score', - 'qcovhsp', - 'qstart', - 'qend', - 'sstart', - 'send'] def invoke_blast(db: Path) -> str: return Blastn().genotype( - contigs_fasta=Path(fasta), - database=Path(db), + contigs_fasta=contigs_fasta, + database=db, ) if db is None: @@ -179,7 +170,7 @@ def invoke_blast(db: Path) -> str: stdout = invoke_blast(db) samples = {} # {query_name: (subject_name, matched_fraction)} - matches = sorted(DictReader(StringIO(stdout), blast_columns), + matches = sorted(DictReader(StringIO(stdout), Blastn.BLAST_COLUMNS), key=lambda row: (row['qaccver'], float(row['score']))) if not blast_csv: blast_writer = None @@ -238,32 +229,32 @@ def invoke_blast(db: Path) -> str: return samples -def fasta_to_csv(contigs_fasta_path: str, +def fasta_to_csv(contigs_fasta: Path, contigs_csv: TextIO, merged_contigs_csv: Optional[TextIO] = None, blast_csv: Optional[TextIO] = None) -> None: """Run BLAST search to identify contig sequences and write them to CSV. Args: - contigs_fasta_path (str): Path to the FASTA file containing contig sequences. + contigs_fasta (Path): Path to the FASTA file containing contig sequences. contigs_csv (TextIO): Open file to write assembled contigs to. merged_contigs_csv: open file to read contigs that were merged from amplicon reads. blast_csv (Optional[TextIO]): Open file to write BLAST search results for each contig. """ - with open(contigs_fasta_path, 'a') as contigs_fasta: + with open(contigs_fasta, 'a') as contigs_fasta_writer: if merged_contigs_csv is not None: contig_reader = DictReader(merged_contigs_csv) for i, row in enumerate(contig_reader, 1): contig_name = f'merged-contig-{i}' - contigs_fasta.write(f">{contig_name}\n{row['contig']}\n") + contigs_fasta_writer.write(f">{contig_name}\n{row['contig']}\n") writer = init_contigs_refs(cast(TextIO, contigs_csv)) group_refs: Dict[str, str] = {} - genotypes = genotype(contigs_fasta_path, blast_csv=blast_csv, group_refs=group_refs) + genotypes = genotype(contigs_fasta, blast_csv=blast_csv, group_refs=group_refs) - write_contigs(writer, group_refs, genotypes, contigs_fasta_path) + write_contigs(writer, group_refs, genotypes, contigs_fasta) contigs_csv.flush()