diff --git a/micall/drivers/sample.py b/micall/drivers/sample.py index c63c3353c..20e64fd09 100644 --- a/micall/drivers/sample.py +++ b/micall/drivers/sample.py @@ -429,7 +429,7 @@ def run_denovo(self, excluded_seeds): with open(self.unstitched_contigs_csv, 'w') as unstitched_contigs_csv, \ open(self.merged_contigs_csv, 'r') as merged_contigs_csv, \ open(self.blast_csv, 'w') as blast_csv: - fasta_to_csv(self.unstitched_contigs_fasta, + fasta_to_csv(Path(self.unstitched_contigs_fasta), unstitched_contigs_csv, merged_contigs_csv, blast_csv=blast_csv, diff --git a/micall/tests/test_fasta_to_csv.py b/micall/tests/test_fasta_to_csv.py index 4ab951b6d..70edbf9d6 100644 --- a/micall/tests/test_fasta_to_csv.py +++ b/micall/tests/test_fasta_to_csv.py @@ -16,7 +16,7 @@ def DEFAULT_DATABASE(): @pytest.fixture(scope='session', name='hcv_db') def check_hcv_db(DEFAULT_DATABASE): - db_path = Path(DEFAULT_DATABASE) + db_path = DEFAULT_DATABASE index_path = db_path.parent / "refs.fasta.nin" build_needed = not index_path.exists() if not build_needed: @@ -32,7 +32,7 @@ def check_hcv_db(DEFAULT_DATABASE): def test_make_blast_db_excludes_hivgha(hcv_db, DEFAULT_DATABASE): - fasta_path = Path(DEFAULT_DATABASE) + fasta_path = DEFAULT_DATABASE with fasta_path.open() as f: for reference in SeqIO.parse(f, 'fasta'): # Exclude the Ghana project, because they're recombinant. @@ -55,7 +55,7 @@ def test_genotype(tmpdir, hcv_db): 1,HCV-1a,41,1.0,100,1,41,8187,8227 """ - genotype(str(contigs_fasta), blast_csv=blast_csv) + genotype(contigs_fasta, blast_csv=blast_csv) assert expected_blast_csv == blast_csv.getvalue() @@ -75,7 +75,7 @@ def test_fasta_to_csv_two_sequences(tmpdir, hcv_db): HCV-1a,1.0,HCV-1a,CAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -98,7 +98,7 @@ def test_fasta_to_csv_two_groups(tmpdir, hcv_db): HCV-2b,1.0,HCV-2b,TGCAATGACAGCTTACAGACGGGTTTCCTCGCTTCCTTGTTTTACACCCA """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -115,7 +115,7 @@ def test_fasta_to_csv_not_found(tmpdir, hcv_db): unknown,0,,CATCACATAGGAGA """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -135,7 +135,7 @@ def test_fasta_to_csv_partial_match(tmpdir, hcv_db): HCV-1a,0.75,HCV-1a,CATCACATAGGAGACAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -156,7 +156,7 @@ def test_fasta_to_csv_reversed_match(tmpdir, hcv_db): HCV-1a,0.75,HCV-1a,CATCACATAGGAGACAGGGCTCCAGGACTGCACCATGCTCGTGTGTGGCGACGAC """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -183,7 +183,7 @@ def test_fasta_to_csv(tmpdir, hcv_db): 1,HCV-1a,41,1.0,100,1,41,8187,8227 """ - fasta_to_csv(str(contigs_fasta), contigs_csv, blast_csv=blast_csv) + fasta_to_csv(contigs_fasta, contigs_csv, blast_csv=blast_csv) assert expected_contigs_csv == contigs_csv.getvalue() assert expected_blast_csv == blast_csv.getvalue() @@ -198,7 +198,7 @@ def test_fasta_to_csv_none(tmpdir, hcv_db): ref,match,group_ref,contig """ - fasta_to_csv(str(contigs_fasta), contigs_csv) + fasta_to_csv(contigs_fasta, contigs_csv) assert expected_contigs_csv == contigs_csv.getvalue() @@ -220,7 +220,7 @@ def test_merged_contig(tmpdir, hcv_db): """ with merged_contigs_path.open() as merged_contigs_csv: - fasta_to_csv(str(contigs_fasta), + fasta_to_csv(contigs_fasta, contigs_csv, merged_contigs_csv=merged_contigs_csv) diff --git a/micall/utils/contig_blaster.py b/micall/utils/contig_blaster.py index ccdbacd0a..20403bb9c 100644 --- a/micall/utils/contig_blaster.py +++ b/micall/utils/contig_blaster.py @@ -4,6 +4,7 @@ from itertools import groupby from operator import itemgetter from tempfile import NamedTemporaryFile +from pathlib import Path from micall.utils.fasta_to_csv import fasta_to_csv @@ -44,7 +45,7 @@ def main(): fasta_file.flush() new_contigs_csv = StringIO() blast_csv = StringIO() - fasta_to_csv(fasta_file.name, new_contigs_csv, blast_csv=blast_csv) + fasta_to_csv(Path(fasta_file.name), new_contigs_csv, blast_csv=blast_csv) blast_csv.seek(0) for source_contig_num, contig_rows in groupby(DictReader(blast_csv), itemgetter('contig_num')): diff --git a/micall/utils/contig_summary.py b/micall/utils/contig_summary.py index 2df68bdf9..7b8529b99 100644 --- a/micall/utils/contig_summary.py +++ b/micall/utils/contig_summary.py @@ -3,9 +3,8 @@ from io import StringIO from pathlib import Path -from Bio.Blast.Applications import NcbiblastnCommandline - from micall.utils.fasta_to_csv import default_database +from micall.utils.externals import Blastn import matplotlib matplotlib.use('Agg') @@ -13,16 +12,6 @@ DEFAULT_SCRATCH_PATH = (Path(__file__).parent.parent / "tests" / "working" / "basespace_linked_data" / "scratch") -BLAST_COLUMNS = ['qaccver', - 'saccver', - 'qlen', - 'pident', - 'score', - 'qcovhsp', - 'qstart', - 'qend', - 'sstart', - 'send'] def parse_args(): @@ -40,7 +29,6 @@ def main(): sample_dirs = [d for d in args.scratch.iterdir() if d.is_dir()] contig_plots_path = sample_dirs[0].parent / 'contig_plots' contig_plots_path.mkdir(exist_ok=True) - blast_format = f'"10 {" ".join(BLAST_COLUMNS)}"' sample_dirs.sort() incomplete_count = empty_count = 0 for sample_dir in sample_dirs: @@ -62,16 +50,10 @@ def main(): continue contigs_fasta_path, = contigs_fasta_paths with default_database() as DEFAULT_DATABASE: - cline = NcbiblastnCommandline(query=str(contigs_fasta_path), - db=DEFAULT_DATABASE, - outfmt=blast_format, - evalue=0.0001, - gapopen=5, - gapextend=2, - penalty=-3, - reward=1, - max_target_seqs=5000) - stdout, _ = cline(stderr=False) + stdout = Blastn().genotype( + contigs_fasta=contigs_fasta_path, + database=DEFAULT_DATABASE, + ) plot_contigs(sample_dir, stdout) plot_path = contig_plots_path / (sample_dir.name + '.png') plt.savefig(str(plot_path)) @@ -84,7 +66,7 @@ def main(): def plot_contigs(sample_dir, contigs_csv): - reader = DictReader(StringIO(contigs_csv), BLAST_COLUMNS) + reader = DictReader(StringIO(contigs_csv), Blastn.BLAST_COLUMNS) rows = list(reader) rows.sort(reverse=True, key=lambda row: int(row['score'])) contig_names = sorted({row['qaccver'] for row in rows}) diff --git a/micall/utils/externals.py b/micall/utils/externals.py index 9bc08dee5..3fb790cf0 100644 --- a/micall/utils/externals.py +++ b/micall/utils/externals.py @@ -4,32 +4,48 @@ import re from subprocess import CalledProcessError from pathlib import Path +import logging +from typing import Optional, List, Any, Iterator +from functools import cached_property +from abc import ABC, abstractmethod +import shutil class AssetWrapper(object): """ Wraps a packaged asset, and finds its path. """ - def __init__(self, path, **kwargs): + def __init__(self, path: Path): # noinspection PyArgumentList - super(AssetWrapper, self).__init__(**kwargs) app_dir = Path(__file__).parent.parent / "assets" local_path = app_dir / path if local_path.exists(): - self.path = str(local_path) + self.path = local_path else: - self.path = os.path.join(getattr(sys, '_MEIPASS', ''), path) + self.path = Path(getattr(sys, '_MEIPASS', '')) / path -class CommandWrapper(AssetWrapper): +class CommandWrapper(ABC, AssetWrapper): """ Wraps an external tool, and builds the command lines for it. """ - def __init__(self, version, execname, logger=None, **kwargs): - super(CommandWrapper, self).__init__(path=execname, **kwargs) - self.version = version - self.logger = logger - - def build_args(self, args): - return [self.path] + args - - def check_output(self, args=None, *popenargs, **kwargs): + def __init__(self, + version: Optional[str], + execname: str, + logger: Optional[logging.Logger] = None, + ) -> None: + + path = shutil.which(execname) + if path is None: + raise RuntimeError(f"Cannot find {execname!r} executable.") + + super(CommandWrapper, self).__init__(path=Path(path)) + self._version: Optional[str] = version + self._logger: Optional[logging.Logger] = logger + if self._version is not None: + assert self.version == self._version + + def build_args(self, args: List[str]) -> List[str]: + return [str(self.path)] + args + + def check_output(self, args: List[str] = [], + *popenargs: Any, **kwargs: Any) -> str: """ Run command with arguments and return its output as a byte string. See subprocess.check_output() for details. @@ -39,16 +55,16 @@ def check_output(self, args=None, *popenargs, **kwargs): @return the command's output """ try: - startupinfo = subprocess.STARTUPINFO() + startupinfo = subprocess.STARTUPINFO() # type: ignore[attr-defined] # Needed on Windows, fails elsewhere. # noinspection PyUnresolvedReferences - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore[attr-defined] kwargs.setdefault('startupinfo', startupinfo) except AttributeError: pass kwargs.setdefault('universal_newlines', True) - final_args = self.build_args(args or []) + final_args = self.build_args(args) try: return subprocess.check_output(final_args, *popenargs, **kwargs) except OSError as ex: @@ -56,7 +72,8 @@ def check_output(self, args=None, *popenargs, **kwargs): ex.strerror = f'{original_error} for command {final_args}.' raise - def create_process(self, args=None, *popenargs, **kwargs): + def create_process(self, args: List[str] = [], + *popenargs: Any, **kwargs: Any) -> subprocess.Popen: """ Execute a child program in a new process. See subprocess.Popen class for details. @@ -66,10 +83,10 @@ def create_process(self, args=None, *popenargs, **kwargs): @return the new Popen object """ try: - startupinfo = subprocess.STARTUPINFO() + startupinfo = subprocess.STARTUPINFO() # type: ignore[attr-defined] # Needed on Windows, fails elsewhere. # noinspection PyUnresolvedReferences - startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore[attr-defined] kwargs.setdefault('startupinfo', startupinfo) except AttributeError: pass @@ -78,12 +95,14 @@ def create_process(self, args=None, *popenargs, **kwargs): kwargs.setdefault('stdin', devnull) return subprocess.Popen(self.build_args(args or []), *popenargs, **kwargs) - def check_logger(self): + @property + def logger(self) -> logging.Logger: """ Raise an exception if no logger is set for this command. """ - if self.logger is None: + if self._logger is None: raise RuntimeError('logger not set for command {}'.format(self.path)) + return self._logger - def log_call(self, args, format_string='%s'): + def log_call(self, args: List[str], format_string: str = '%s') -> None: """ Launch a subprocess, and log any output to the debug logger. Raise an exception if the return code is not zero. This assumes only a @@ -93,12 +112,13 @@ def log_call(self, args, format_string='%s'): @param format_string: A template for the debug message that will have each line of output formatted with it. """ - self.check_logger() + output = self.check_output(args, stderr=subprocess.STDOUT) for line in output.splitlines(): self.logger.debug(format_string, line) - def yield_output(self, args, *popenargs, **kwargs): + def yield_output(self, args: List[str], *popenargs: Any, **kwargs: Any + ) -> Iterator[str]: """ Launch a subprocess, and yield the lines of standard output. Raise an exception if the return code is not zero. @@ -112,6 +132,7 @@ def yield_output(self, args, *popenargs, **kwargs): stdout=subprocess.PIPE, *popenargs, **kwargs) + assert p.stdout is not None for line in p.stdout: yield line p.wait() @@ -119,23 +140,27 @@ def yield_output(self, args, *popenargs, **kwargs): raise subprocess.CalledProcessError(p.returncode, self.build_args(args)) - def redirect_call(self, args, outpath, format_string='%s', ignored=None): + def redirect_call(self, + args: List[str], + outpath: Path, + format_string: str = '%s', + ignored: Optional[re.Pattern] = None) -> None: """ Launch a subprocess, and redirect the output to a file. Raise an exception if the return code is not zero. Standard error is logged to the warn logger. @param args: A list of arguments to pass to subprocess.Popen(). - @param outpath: a filename that stdout should be redirected to. If you + @param outpath: a filepath that stdout should be redirected to. If you don't need to redirect the output, then just use subprocess.check_call(). @param format_string: A template for the debug message that will have each line of standard error formatted with it. @param ignored: A regular expression pattern for stderr messages that should not be logged. """ - self.check_logger() with open(outpath, 'w') as outfile: p = self.create_process(args, stdout=outfile, stderr=subprocess.PIPE) + assert p.stderr is not None for line in p.stderr: if not ignored or not re.search(ignored, line): self.logger.warn(format_string, line.rstrip()) @@ -144,46 +169,63 @@ def redirect_call(self, args, outpath, format_string='%s', ignored=None): raise subprocess.CalledProcessError(p.returncode, self.build_args(args)) - def validate_version(self, version_found): - if self.version is None: - self.version = version_found - elif self.version != version_found: + @abstractmethod + def get_version(self) -> str: ... + + def _validate_version(self, version_found: str) -> None: + if self._version is None: + pass + elif self._version != version_found: message = '{} version incompatibility: expected {}, found {}'.format( self.path, - self.version, + self._version, version_found) raise RuntimeError(message) + @cached_property + def version(self) -> str: + version_found = self.get_version() + self._validate_version(version_found) + return version_found + class CutAdapt(CommandWrapper): - def __init__(self, version=None, execname='cutadapt', logger=None, **kwargs): - super(CutAdapt, self).__init__(version, execname, logger, **kwargs) + def __init__(self, + version: Optional[str] = None, + execname: str = 'cutadapt', + logger: Optional[logging.Logger] = None, + ): + super(CutAdapt, self).__init__(version, execname, logger) + + def get_version(self): stdout = self.check_output(['--version'], stderr=subprocess.STDOUT) - version_found = stdout.split('\n')[0].split()[-1] - self.validate_version(version_found) + return stdout.split('\n')[0].split()[-1] class Bowtie2(CommandWrapper): - def __init__(self, version, execname='bowtie2', logger=None, **kwargs): - super(Bowtie2, self).__init__(version, execname, logger, **kwargs) + def __init__(self, + version: Optional[str] = None, + execname: str = 'bowtie2', + logger: Optional[logging.Logger] = None, + ): + super(Bowtie2, self).__init__(version, execname, logger) + + def get_version(self): stdout = self.check_output(['--version'], stderr=subprocess.STDOUT) - version_found = stdout.split('\n')[0].split()[-1] - self.validate_version(version_found) + return stdout.split('\n')[0].split()[-1] class Bowtie2Build(CommandWrapper): def __init__(self, - version, - execname='bowtie2-build', - logger=None, - **kwargs): - super(Bowtie2Build, self).__init__(version, - execname, - logger, - **kwargs) + version: Optional[str] = None, + execname: str = 'bowtie2-build', + logger: Optional[logging.Logger] = None, + ): + super(Bowtie2Build, self).__init__(version, execname, logger) + + def get_version(self): stdout = self.check_output(['--version'], stderr=subprocess.STDOUT) - version_found = stdout.split('\n')[0].split()[-1] - self.validate_version(version_found) + return stdout.split('\n')[0].split()[-1] def build(self, ref_path, reffile_template): """ Build an index from a reference file. @@ -192,9 +234,9 @@ def build(self, ref_path, reffile_template): be small enough to use with a small index (4GB or less). @param reffile_template: file name template for the index files. """ + small_index_max_size = 4 * 1024**3 - 200 # From bowtie2-build wrapper assert os.stat(ref_path).st_size <= small_index_max_size - self.check_logger() lines = [] try: for line in self.yield_output(['--wrapper', @@ -214,7 +256,7 @@ def build(self, ref_path, reffile_template): self.logger.debug(line) -class LineCounter(object): +class LineCounter: """ Run the wc command to count lines in a file. Fall back to pure Python if wc command is not available. @@ -222,9 +264,9 @@ class LineCounter(object): Inspired by zed: https://gist.github.com/zed/0ac760859e614cd03652 """ def __init__(self): - self.command = 'wc' + self.command: str = 'wc' - def count(self, filename, gzip=False): + def count(self, filename: Path, gzip: bool = False) -> int: if self.command: try: if gzip: @@ -238,10 +280,10 @@ def count(self, filename, gzip=False): return int(wc_output.split()[0]) except CalledProcessError: - self.command = None + self.command = '' return self.buffered_count(filename) - def buffered_count(self, filename): + def buffered_count(self, filename: Path) -> int: with open(filename) as f: lines = 0 buf_size = 1024 * 1024 @@ -253,3 +295,43 @@ def buffered_count(self, filename): buf = read_f(buf_size) return lines + + +class Blastn(CommandWrapper): + BLAST_COLUMNS = ['qaccver', + 'saccver', + 'qlen', + 'pident', + 'score', + 'qcovhsp', + 'qstart', + 'qend', + 'sstart', + 'send', + ] + + def __init__(self, + version: Optional[str] = None, + execname: str = 'blastn', + logger: Optional[logging.Logger] = None, + ): + super(Blastn, self).__init__(version, execname, logger) + + def get_version(self): + stdout = self.check_output(['-version'], stderr=subprocess.STDOUT) + return stdout.split('\n')[0].split()[-1] + + def genotype(self, database: Path, contigs_fasta: Path) -> str: + blast_format = '10 ' + " ".join(Blastn.BLAST_COLUMNS) + program = ["-outfmt", blast_format, + "-query", str(contigs_fasta), + "-db", str(database), + "-evalue", "0.0001", + "-max_target_seqs", "5000", + "-gapopen", "5", + "-gapextend", "2", + "-penalty", "-3", + "-reward", "1", + ] + stdout = self.check_output(program) + return stdout diff --git a/micall/utils/fasta_to_csv.py b/micall/utils/fasta_to_csv.py index 892291a98..6c52465fe 100644 --- a/micall/utils/fasta_to_csv.py +++ b/micall/utils/fasta_to_csv.py @@ -14,10 +14,10 @@ import importlib.resources as resources from Bio import SeqIO -from Bio.Blast.Applications import NcbiblastnCommandline from micall.core.project_config import ProjectConfig from micall.utils.contig_stitcher_contigs import GenotypedContig +from micall.utils.externals import Blastn @contextlib.contextmanager @@ -55,27 +55,27 @@ def reference_dir() -> Iterator[Path]: @contextlib.contextmanager -def default_database() -> Iterator[str]: +def default_database() -> Iterator[Path]: with reference_dir() as blast_db: - yield str(blast_db / "refs.fasta") + yield blast_db / "refs.fasta" def read_assembled_contigs(group_refs: Dict[str, str], genotypes: Dict[str, typing.Tuple[str, float]], - contigs_fasta_path: str) -> Iterable[GenotypedContig]: + contigs_fasta: Path) -> Iterable[GenotypedContig]: """Read assembled contigs and generate GenotypedContig objects. Args: group_refs (Dict[str, str]): Mapping of reference names to group references. genotypes (Dict[str, Tuple[str, float]]): Mapping of contig names to (reference name, match fraction). - contigs_fasta_path (str): Path to the FASTA file containing contig sequences. + contigs_fasta (Path): Path to the FASTA file containing contig sequences. Returns: Iterable[GenotypedContig]: An iterable of GenotypedContig objects. """ projects = ProjectConfig.loadDefault() - for i, record in enumerate(SeqIO.parse(contigs_fasta_path, "fasta")): + for i, record in enumerate(SeqIO.parse(contigs_fasta, "fasta")): (ref_name, match_fraction) = genotypes.get(record.name, ('unknown', 0)) seq = record.seq if match_fraction < 0: @@ -118,29 +118,29 @@ def init_contigs_refs(contigs_csv: TextIO) -> DictWriter: def write_contigs(writer: DictWriter, group_refs: Dict[str, str], genotypes: Dict[str, typing.Tuple[str, float]], - contigs_fasta_path: str): + contigs_fasta: Path): """Write contigs to a CSV file. Args: writer (DictWriter): CSV writer to write contigs. group_refs (Dict[str, str]): Mapping of reference names to group references. genotypes (Dict[str, Tuple[str, float]]): Mapping of contig names to (reference name, match fraction). - contigs_fasta_path (str): Path to the FASTA file containing contig sequences. + contigs_fasta (Path): Path to the FASTA file containing contig sequences. """ - for contig in read_assembled_contigs(group_refs, genotypes, contigs_fasta_path): + for contig in read_assembled_contigs(group_refs, genotypes, contigs_fasta): writer.writerow(dict(ref=contig.ref_name, match=contig.match_fraction, group_ref=contig.group_ref, contig=contig.seq)) -def genotype(fasta: str, db: Optional[str] = None, +def genotype(contigs_fasta: Path, db: Optional[Path] = None, blast_csv: Optional[TextIO] = None, group_refs: Optional[Dict[str, str]] = None) -> Dict[str, typing.Tuple[str, float]]: """Use Blastn to search for the genotype of a set of reference sequences. Args: - fasta (str): File path of the FASTA file containing the query sequences. + fasta (Path): File path of the FASTA file containing the query sequences. db (Optional[str]): File path of the database to search for matches. blast_csv (Optional[TextIO]): Open file to write the blast matches to, or None. group_refs (Optional[Dict[str, str]]): Dictionary to fill with the mapping from @@ -151,33 +151,17 @@ def genotype(fasta: str, db: Optional[str] = None, """ contig_nums: Dict[str, int] = {} # {contig_name: contig_num} - with open(fasta) as f: + with open(contigs_fasta) as f: for line in f: if line.startswith('>'): contig_name = line[1:-1] contig_nums[contig_name] = len(contig_nums) + 1 - blast_columns = ['qaccver', - 'saccver', - 'pident', - 'score', - 'qcovhsp', - 'qstart', - 'qend', - 'sstart', - 'send'] - - def invoke_blast(db: str) -> str: - cline = NcbiblastnCommandline(query=fasta, - db=db, - outfmt=f'"10 {" ".join(blast_columns)}"', - evalue=0.0001, - gapopen=5, - gapextend=2, - penalty=-3, - reward=1, - max_target_seqs=5000) - stdout, _ = cline() - return stdout + + def invoke_blast(db: Path) -> str: + return Blastn().genotype( + contigs_fasta=contigs_fasta, + database=db, + ) if db is None: with default_database() as db: @@ -186,7 +170,7 @@ def invoke_blast(db: str) -> str: stdout = invoke_blast(db) samples = {} # {query_name: (subject_name, matched_fraction)} - matches = sorted(DictReader(StringIO(stdout), blast_columns), + matches = sorted(DictReader(StringIO(stdout), Blastn.BLAST_COLUMNS), key=lambda row: (row['qaccver'], float(row['score']))) if not blast_csv: blast_writer = None @@ -245,32 +229,32 @@ def invoke_blast(db: str) -> str: return samples -def fasta_to_csv(contigs_fasta_path: str, +def fasta_to_csv(contigs_fasta: Path, contigs_csv: TextIO, merged_contigs_csv: Optional[TextIO] = None, blast_csv: Optional[TextIO] = None) -> None: """Run BLAST search to identify contig sequences and write them to CSV. Args: - contigs_fasta_path (str): Path to the FASTA file containing contig sequences. + contigs_fasta (Path): Path to the FASTA file containing contig sequences. contigs_csv (TextIO): Open file to write assembled contigs to. merged_contigs_csv: open file to read contigs that were merged from amplicon reads. blast_csv (Optional[TextIO]): Open file to write BLAST search results for each contig. """ - with open(contigs_fasta_path, 'a') as contigs_fasta: + with open(contigs_fasta, 'a') as contigs_fasta_writer: if merged_contigs_csv is not None: contig_reader = DictReader(merged_contigs_csv) for i, row in enumerate(contig_reader, 1): contig_name = f'merged-contig-{i}' - contigs_fasta.write(f">{contig_name}\n{row['contig']}\n") + contigs_fasta_writer.write(f">{contig_name}\n{row['contig']}\n") writer = init_contigs_refs(cast(TextIO, contigs_csv)) group_refs: Dict[str, str] = {} - genotypes = genotype(contigs_fasta_path, blast_csv=blast_csv, group_refs=group_refs) + genotypes = genotype(contigs_fasta, blast_csv=blast_csv, group_refs=group_refs) - write_contigs(writer, group_refs, genotypes, contigs_fasta_path) + write_contigs(writer, group_refs, genotypes, contigs_fasta) contigs_csv.flush() diff --git a/pyproject.toml b/pyproject.toml index ea1ecdd68..376a3e972 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -210,7 +210,6 @@ exclude = '''(?x) ^micall/utils/coverage_data[.]py$| ^micall/utils/dd[.]py$| ^micall/utils/denovo_simplify[.]py$| - ^micall/utils/externals[.]py$| ^micall/utils/fetch_sequences[.]py$| ^micall/utils/find_by_coverage[.]py$| ^micall/utils/find_chimera[.]py$|