Skip to content

Commit

Permalink
Further improve typing by replacing str with Path
Browse files Browse the repository at this point in the history
  • Loading branch information
Donaim committed Dec 7, 2024
1 parent 199f3a5 commit d9b433e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 38 deletions.
2 changes: 1 addition & 1 deletion micall/drivers/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def run_denovo(self, excluded_seeds):
with open(self.unstitched_contigs_csv, 'w') as unstitched_contigs_csv, \
open(self.merged_contigs_csv, 'r') as merged_contigs_csv, \
open(self.blast_csv, 'w') as blast_csv:
fasta_to_csv(self.unstitched_contigs_fasta,
fasta_to_csv(Path(self.unstitched_contigs_fasta),
unstitched_contigs_csv,
merged_contigs_csv,
blast_csv=blast_csv,
Expand Down
18 changes: 9 additions & 9 deletions micall/tests/test_fasta_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_genotype(tmpdir, hcv_db):
1,HCV-1a,41,1.0,100,1,41,8187,8227
"""

genotype(str(contigs_fasta), blast_csv=blast_csv)
genotype(contigs_fasta, blast_csv=blast_csv)

assert expected_blast_csv == blast_csv.getvalue()

Expand All @@ -75,7 +75,7 @@ def test_fasta_to_csv_two_sequences(tmpdir, hcv_db):
HCV-1a,1.0,HCV-1a,CAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC
"""

fasta_to_csv(str(contigs_fasta), contigs_csv)
fasta_to_csv(contigs_fasta, contigs_csv)

assert expected_contigs_csv == contigs_csv.getvalue()

Expand All @@ -98,7 +98,7 @@ def test_fasta_to_csv_two_groups(tmpdir, hcv_db):
HCV-2b,1.0,HCV-2b,TGCAATGACAGCTTACAGACGGGTTTCCTCGCTTCCTTGTTTTACACCCA
"""

fasta_to_csv(str(contigs_fasta), contigs_csv)
fasta_to_csv(contigs_fasta, contigs_csv)

assert expected_contigs_csv == contigs_csv.getvalue()

Expand All @@ -115,7 +115,7 @@ def test_fasta_to_csv_not_found(tmpdir, hcv_db):
unknown,0,,CATCACATAGGAGA
"""

fasta_to_csv(str(contigs_fasta), contigs_csv)
fasta_to_csv(contigs_fasta, contigs_csv)

assert expected_contigs_csv == contigs_csv.getvalue()

Expand All @@ -135,7 +135,7 @@ def test_fasta_to_csv_partial_match(tmpdir, hcv_db):
HCV-1a,0.75,HCV-1a,CATCACATAGGAGACAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC
"""

fasta_to_csv(str(contigs_fasta), contigs_csv)
fasta_to_csv(contigs_fasta, contigs_csv)

assert expected_contigs_csv == contigs_csv.getvalue()

Expand All @@ -156,7 +156,7 @@ def test_fasta_to_csv_reversed_match(tmpdir, hcv_db):
HCV-1a,0.75,HCV-1a,CATCACATAGGAGACAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC
"""

fasta_to_csv(str(contigs_fasta), contigs_csv)
fasta_to_csv(contigs_fasta, contigs_csv)

assert expected_contigs_csv == contigs_csv.getvalue()

Expand All @@ -183,7 +183,7 @@ def test_fasta_to_csv(tmpdir, hcv_db):
1,HCV-1a,41,1.0,100,1,41,8187,8227
"""

fasta_to_csv(str(contigs_fasta), contigs_csv, blast_csv=blast_csv)
fasta_to_csv(contigs_fasta, contigs_csv, blast_csv=blast_csv)

assert expected_contigs_csv == contigs_csv.getvalue()
assert expected_blast_csv == blast_csv.getvalue()
Expand All @@ -198,7 +198,7 @@ def test_fasta_to_csv_none(tmpdir, hcv_db):
ref,match,group_ref,contig
"""

fasta_to_csv(str(contigs_fasta), contigs_csv)
fasta_to_csv(contigs_fasta, contigs_csv)

assert expected_contigs_csv == contigs_csv.getvalue()

Expand All @@ -220,7 +220,7 @@ def test_merged_contig(tmpdir, hcv_db):
"""

with merged_contigs_path.open() as merged_contigs_csv:
fasta_to_csv(str(contigs_fasta),
fasta_to_csv(contigs_fasta,
contigs_csv,
merged_contigs_csv=merged_contigs_csv)

Expand Down
3 changes: 2 additions & 1 deletion micall/utils/contig_blaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from itertools import groupby
from operator import itemgetter
from tempfile import NamedTemporaryFile
from pathlib import Path

from micall.utils.fasta_to_csv import fasta_to_csv

Expand Down Expand Up @@ -44,7 +45,7 @@ def main():
fasta_file.flush()
new_contigs_csv = StringIO()
blast_csv = StringIO()
fasta_to_csv(fasta_file.name, new_contigs_csv, blast_csv=blast_csv)
fasta_to_csv(Path(fasta_file.name), new_contigs_csv, blast_csv=blast_csv)
blast_csv.seek(0)
for source_contig_num, contig_rows in groupby(DictReader(blast_csv),
itemgetter('contig_num')):
Expand Down
45 changes: 18 additions & 27 deletions micall/utils/fasta_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ def default_database() -> Iterator[Path]:

def read_assembled_contigs(group_refs: Dict[str, str],
genotypes: Dict[str, typing.Tuple[str, float]],
contigs_fasta_path: str) -> Iterable[GenotypedContig]:
contigs_fasta: Path) -> Iterable[GenotypedContig]:
"""Read assembled contigs and generate GenotypedContig objects.
Args:
group_refs (Dict[str, str]): Mapping of reference names to group references.
genotypes (Dict[str, Tuple[str, float]]): Mapping of contig names to (reference name, match fraction).
contigs_fasta_path (str): Path to the FASTA file containing contig sequences.
contigs_fasta (Path): Path to the FASTA file containing contig sequences.
Returns:
Iterable[GenotypedContig]: An iterable of GenotypedContig objects.
"""
projects = ProjectConfig.loadDefault()

for i, record in enumerate(SeqIO.parse(contigs_fasta_path, "fasta")):
for i, record in enumerate(SeqIO.parse(contigs_fasta, "fasta")):
(ref_name, match_fraction) = genotypes.get(record.name, ('unknown', 0))
seq = record.seq
if match_fraction < 0:
Expand Down Expand Up @@ -118,29 +118,29 @@ def init_contigs_refs(contigs_csv: TextIO) -> DictWriter:
def write_contigs(writer: DictWriter,
group_refs: Dict[str, str],
genotypes: Dict[str, typing.Tuple[str, float]],
contigs_fasta_path: str):
contigs_fasta: Path):
"""Write contigs to a CSV file.
Args:
writer (DictWriter): CSV writer to write contigs.
group_refs (Dict[str, str]): Mapping of reference names to group references.
genotypes (Dict[str, Tuple[str, float]]): Mapping of contig names to (reference name, match fraction).
contigs_fasta_path (str): Path to the FASTA file containing contig sequences.
contigs_fasta (Path): Path to the FASTA file containing contig sequences.
"""
for contig in read_assembled_contigs(group_refs, genotypes, contigs_fasta_path):
for contig in read_assembled_contigs(group_refs, genotypes, contigs_fasta):
writer.writerow(dict(ref=contig.ref_name,
match=contig.match_fraction,
group_ref=contig.group_ref,
contig=contig.seq))


def genotype(fasta: str, db: Optional[Path] = None,
def genotype(contigs_fasta: Path, db: Optional[Path] = None,
blast_csv: Optional[TextIO] = None,
group_refs: Optional[Dict[str, str]] = None) -> Dict[str, typing.Tuple[str, float]]:
"""Use Blastn to search for the genotype of a set of reference sequences.
Args:
fasta (str): File path of the FASTA file containing the query sequences.
fasta (Path): File path of the FASTA file containing the query sequences.
db (Optional[str]): File path of the database to search for matches.
blast_csv (Optional[TextIO]): Open file to write the blast matches to, or None.
group_refs (Optional[Dict[str, str]]): Dictionary to fill with the mapping from
Expand All @@ -151,25 +151,16 @@ def genotype(fasta: str, db: Optional[Path] = None,
"""

contig_nums: Dict[str, int] = {} # {contig_name: contig_num}
with open(fasta) as f:
with open(contigs_fasta) as f:
for line in f:
if line.startswith('>'):
contig_name = line[1:-1]
contig_nums[contig_name] = len(contig_nums) + 1
blast_columns = ['qaccver',
'saccver',
'pident',
'score',
'qcovhsp',
'qstart',
'qend',
'sstart',
'send']

def invoke_blast(db: Path) -> str:
return Blastn().genotype(
contigs_fasta=Path(fasta),
database=Path(db),
contigs_fasta=contigs_fasta,
database=db,
)

if db is None:
Expand All @@ -179,7 +170,7 @@ def invoke_blast(db: Path) -> str:
stdout = invoke_blast(db)

samples = {} # {query_name: (subject_name, matched_fraction)}
matches = sorted(DictReader(StringIO(stdout), blast_columns),
matches = sorted(DictReader(StringIO(stdout), Blastn.BLAST_COLUMNS),
key=lambda row: (row['qaccver'], float(row['score'])))
if not blast_csv:
blast_writer = None
Expand Down Expand Up @@ -238,32 +229,32 @@ def invoke_blast(db: Path) -> str:
return samples


def fasta_to_csv(contigs_fasta_path: str,
def fasta_to_csv(contigs_fasta: Path,
contigs_csv: TextIO,
merged_contigs_csv: Optional[TextIO] = None,
blast_csv: Optional[TextIO] = None) -> None:
"""Run BLAST search to identify contig sequences and write them to CSV.
Args:
contigs_fasta_path (str): Path to the FASTA file containing contig sequences.
contigs_fasta (Path): Path to the FASTA file containing contig sequences.
contigs_csv (TextIO): Open file to write assembled contigs to.
merged_contigs_csv: open file to read contigs that were merged from amplicon reads.
blast_csv (Optional[TextIO]): Open file to write BLAST search results for each contig.
"""

with open(contigs_fasta_path, 'a') as contigs_fasta:
with open(contigs_fasta, 'a') as contigs_fasta_writer:
if merged_contigs_csv is not None:
contig_reader = DictReader(merged_contigs_csv)
for i, row in enumerate(contig_reader, 1):
contig_name = f'merged-contig-{i}'
contigs_fasta.write(f">{contig_name}\n{row['contig']}\n")
contigs_fasta_writer.write(f">{contig_name}\n{row['contig']}\n")

writer = init_contigs_refs(cast(TextIO, contigs_csv))
group_refs: Dict[str, str] = {}

genotypes = genotype(contigs_fasta_path, blast_csv=blast_csv, group_refs=group_refs)
genotypes = genotype(contigs_fasta, blast_csv=blast_csv, group_refs=group_refs)

write_contigs(writer, group_refs, genotypes, contigs_fasta_path)
write_contigs(writer, group_refs, genotypes, contigs_fasta)
contigs_csv.flush()


Expand Down

0 comments on commit d9b433e

Please sign in to comment.