Skip to content

Commit

Permalink
Merge pull request #30 from apriltuesday/EVA-2386
Browse files Browse the repository at this point in the history
EVA-2386 - Automate backlog processing
  • Loading branch information
apriltuesday authored Apr 29, 2021
2 parents 61dab26 + 69bd14f commit 37026ec
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 64 deletions.
59 changes: 59 additions & 0 deletions bin/prepare_backlog_study.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python

# Copyright 2021 EMBL - European Bioinformatics Institute
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import sys
from argparse import ArgumentParser

from ebi_eva_common_pyutils.logger import logging_config as log_cfg

from eva_submission.eload_backlog import EloadBacklog

sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from eva_submission.eload_validation import EloadValidation
from eva_submission.submission_config import load_config

logger = log_cfg.get_logger(__name__)


def main():
argparse = ArgumentParser(description='Prepare to process backlog study and validate VCFs.')
argparse.add_argument('--eload', required=True, type=int, help='The ELOAD number for this submission')
argparse.add_argument('--debug', action='store_true', default=False,
help='Set the script to output logging information at debug level')

args = argparse.parse_args()

log_cfg.add_stdout_handler()
if args.debug:
log_cfg.set_log_level(logging.DEBUG)

# Load the config_file from default location
load_config()

preparation = EloadBacklog(args.eload)
preparation.fill_in_config()

validation = EloadValidation(args.eload)
validation_tasks = ['assembly_check', 'vcf_check']
validation.validate(validation_tasks)

logger.info('Preparation complete, if files are valid please run ingestion as normal.')


if __name__ == "__main__":
main()
119 changes: 119 additions & 0 deletions eva_submission/eload_backlog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
from xml.etree import ElementTree as ET

from cached_property import cached_property
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query
import requests
from requests.auth import HTTPBasicAuth

from eva_submission.eload_submission import Eload
from eva_submission.eload_utils import get_metadata_conn, get_genome_fasta_and_report


class EloadBacklog(Eload):

def fill_in_config(self):
"""Fills in config params from metadata DB and ENA, enabling later parts of pipeline to run."""
self.eload_cfg.set('brokering', 'ena', 'PROJECT', value=self.project_accession)
self.get_analysis_info()
self.get_species_info()
self.get_hold_date()
self.eload_cfg.write()

@cached_property
def project_accession(self):
with get_metadata_conn() as conn:
query = f"select project_accession from evapro.project_eva_submission where eload_id={self.eload_num};"
rows = get_all_results_for_query(conn, query)
if len(rows) != 1:
raise ValueError(f'No project accession for {self.eload} found in metadata DB.')
return rows[0][0]

@cached_property
def project_alias(self):
with get_metadata_conn() as conn:
query = f"select alias from evapro.project where project_accession={self.project_accession};"
rows = get_all_results_for_query(conn, query)
if len(rows) != 1:
raise ValueError(f'No project alias for {self.project_accession} found in metadata DB.')
return rows[0][0]

def get_species_info(self):
"""Adds species info into the config: taxonomy id and scientific name,
and assembly accession, fasta, and report."""
with get_metadata_conn() as conn:
query = f"select a.taxonomy_id, b.scientific_name, d.assembly_accession " \
f"from project_taxonomy a " \
f"join taxonomy b on a.taxonomy_id=b.taxonomy_id " \
f"join assembly_set c on b.taxonomy_id=c.taxonomy_id " \
f"join accessioned_assembly d on c.assembly_set_id=d.assembly_set_id " \
f"where a.project_accession='{self.project_accession}';"
rows = get_all_results_for_query(conn, query)
if len(rows) != 1:
raise ValueError(f'No taxonomy for {self.project_accession} found in metadata DB.')
tax_id, sci_name, asm_accession = rows[0]
self.eload_cfg.set('submission', 'taxonomy_id', value=tax_id)
self.eload_cfg.set('submission', 'scientific_name', value=sci_name)
self.eload_cfg.set('submission', 'assembly_accession', value=asm_accession)

fasta_path, report_path = get_genome_fasta_and_report(sci_name, asm_accession)
self.eload_cfg.set('submission', 'assembly_fasta', value=fasta_path)
self.eload_cfg.set('submission', 'assembly_report', value=report_path)

def get_analysis_info(self):
"""Adds analysis info into the config: analysis accession(s), and vcf and index files."""
with get_metadata_conn() as conn:
query = f"select a.analysis_accession, array_agg(c.filename) " \
f"from project_analysis a " \
f"join analysis_file b on a.analysis_accession=b.analysis_accession " \
f"join file c on b.file_id=c.file_id " \
f"where a.project_accession='{self.project_accession}' " \
f"group by a.analysis_accession;"
rows = get_all_results_for_query(conn, query)
if len(rows) == 0:
raise ValueError(f'No analyses for {self.project_accession} found in metadata DB.')

submitted_vcfs = []
for analysis_accession, filenames in rows:
# TODO for now we assume a single analysis per project as that's what the eload config supports
self.eload_cfg.set('brokering', 'ena', 'ANALYSIS', value=analysis_accession)
for fn in filenames:
full_path = os.path.join(self._get_dir('vcf'), fn)
if not os.path.exists(full_path):
self.error(f'File not found: {full_path}')
self.error(f'Please check that all VCF and index files are present before retrying.')
raise FileNotFoundError(f'File not found: {full_path}')
if full_path.endswith('tbi'):
index_file = full_path
else:
vcf_file = full_path
if not index_file or not vcf_file:
raise ValueError(f'VCF or index file is missing from metadata DB for analysis {analysis_accession}')
submitted_vcfs.append(vcf_file)
self.eload_cfg.set('brokering', 'vcf_files', vcf_file, 'index', value=index_file)
self.eload_cfg.set('submission', 'vcf_files', value=submitted_vcfs)

def get_hold_date(self):
"""Gets hold date from ENA and adds to the config."""
xml_request = f'''<SUBMISSION_SET>
<SUBMISSION>
<ACTIONS>
<ACTION>
<RECEIPT target="{self.project_alias}"/>
</ACTION>
</ACTIONS>
</SUBMISSION>
</SUBMISSION_SET>'''
response = requests.post(
cfg.query('ena', 'submit_url'),
auth=HTTPBasicAuth(cfg.query('ena', 'username'), cfg.query('ena', 'password')),
files={'SUBMISSION': xml_request}
)
receipt = ET.fromstring(response.text)
try:
hold_date = receipt.findall('PROJECT')[0].attrib['holdUntilDate']
self.eload_cfg.set('brokering', 'ena', 'hold_date', value=hold_date)
except (IndexError, KeyError):
raise ValueError(f"Couldn't get hold date from ENA for {self.project_accession} ({self.project_alias})")
# TODO if there's no hold date because the study is already public, this should be okay
50 changes: 11 additions & 39 deletions eva_submission/eload_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import os
import shutil
import subprocess
from urllib.parse import urlsplit
from pathlib import Path

import yaml
from cached_property import cached_property
from ebi_eva_common_pyutils import command_utils
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.config_utils import get_mongo_uri_for_eva_profile, get_properties_from_xml_file
from ebi_eva_common_pyutils.config_utils import get_mongo_uri_for_eva_profile
from ebi_eva_common_pyutils.metadata_utils import get_variant_warehouse_db_name_from_assembly_and_taxonomy
from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query, execute_query
import psycopg2
import pymongo
from pymongo.uri_parser import split_hosts

from eva_submission import ROOT_DIR
from eva_submission.assembly_taxonomy_insertion import insert_new_assembly_and_taxonomy
from eva_submission.eload_submission import Eload
from eva_submission.eload_utils import get_metadata_conn, get_mongo_creds, get_accession_pg_creds
from eva_submission.ingestion_templates import accession_props_template, variant_load_props_template

project_dirs = {
Expand All @@ -39,10 +37,9 @@ class EloadIngestion(Eload):

def __init__(self, eload_number):
super().__init__(eload_number)
self.settings_xml_file = cfg['maven']['settings_file']
self.project_accession = self.eload_cfg.query('brokering', 'ena', 'PROJECT')
self.project_dir = self.setup_project_dir()
self.mongo_uri = get_mongo_uri_for_eva_profile(cfg['maven']['environment'], self.settings_xml_file)
self.mongo_uri = get_mongo_uri_for_eva_profile(cfg['maven']['environment'], cfg['maven']['settings_file'])

def ingest(
self,
Expand Down Expand Up @@ -90,7 +87,6 @@ def check_brokering_done(self):
if self.eload_cfg.query('brokering', 'ena', 'hold_date') is None:
self.error('No release date found, check that brokering to ENA is done.')
raise ValueError('No release date found in submission config.')


def get_db_name(self):
"""
Expand All @@ -99,7 +95,7 @@ def get_db_name(self):
assm_accession = self.eload_cfg.query('submission', 'assembly_accession')
taxon_id = self.eload_cfg.query('submission', 'taxonomy_id')
# query EVAPRO for db name based on taxonomy id and accession
with self.get_pg_conn() as conn:
with get_metadata_conn() as conn:
db_name = get_variant_warehouse_db_name_from_assembly_and_taxonomy(conn, assm_accession, taxon_id)
if not db_name:
self.error(f'Database for taxonomy id {taxon_id} and assembly {assm_accession} not found in EVAPRO.')
Expand All @@ -118,7 +114,7 @@ def check_variant_db(self, db_name=None):
if not db_name:
db_name = self.get_db_name()
else:
with self.get_pg_conn() as conn:
with get_metadata_conn() as conn:
# warns but doesn't crash if assembly set already exists
insert_new_assembly_and_taxonomy(
assembly_accession=self.eload_cfg.query('submission', 'assembly_accession'),
Expand Down Expand Up @@ -187,33 +183,8 @@ def setup_project_dir(self):
self.eload_cfg.set(self.config_section, 'project_dir', value=str(project_dir))
return project_dir

def get_mongo_creds(self):
properties = get_properties_from_xml_file(cfg['maven']['environment'], self.settings_xml_file)
mongo_host = split_hosts(properties['eva.mongo.host'])[0][0]
mongo_user = properties['eva.mongo.user']
mongo_pass = properties['eva.mongo.passwd']
return mongo_host, mongo_user, mongo_pass

def get_pg_creds(self):
properties = get_properties_from_xml_file(cfg['maven']['environment'], self.settings_xml_file)
pg_url = properties['eva.evapro.jdbc.url']
pg_user = properties['eva.evapro.user']
pg_pass = properties['eva.evapro.password']
return pg_url, pg_user, pg_pass

def get_accession_pg_creds(self):
properties = get_properties_from_xml_file(cfg['maven']['environment'], self.settings_xml_file)
pg_url = properties['eva.accession.jdbc.url']
pg_user = properties['eva.accession.user']
pg_pass = properties['eva.accession.password']
return pg_url, pg_user, pg_pass

def get_pg_conn(self):
pg_url, pg_user, pg_pass = self.get_pg_creds()
return psycopg2.connect(urlsplit(pg_url).path, user=pg_user, password=pg_pass)

def get_study_name(self):
with self.get_pg_conn() as conn:
with get_metadata_conn() as conn:
query = f"SELECT title FROM evapro.project WHERE project_accession='{self.project_accession}';"
rows = get_all_results_for_query(conn, query)
if len(rows) != 1:
Expand All @@ -226,8 +197,8 @@ def get_vep_species(self):

def run_accession_workflow(self):
output_dir = self.create_nextflow_temp_output_directory(base=self.project_dir)
mongo_host, mongo_user, mongo_pass = self.get_mongo_creds()
pg_url, pg_user, pg_pass = self.get_accession_pg_creds()
mongo_host, mongo_user, mongo_pass = get_mongo_creds()
pg_url, pg_user, pg_pass = get_accession_pg_creds()
job_props = accession_props_template(
assembly_accession=self.eload_cfg.query('submission', 'assembly_accession'),
taxonomy_id=self.eload_cfg.query('submission', 'taxonomy_id'),
Expand Down Expand Up @@ -280,6 +251,7 @@ def run_variant_load_workflow(self):
output_dir = self.create_nextflow_temp_output_directory(base=self.project_dir)
job_props = variant_load_props_template(
project_accession=self.project_accession,
# TODO currently there is only ever one of these in the config, even if multiple analyses/files
analysis_accession=self.eload_cfg.query('brokering', 'ena', 'ANALYSIS'),
aggregation=self.eload_cfg.query(self.config_section, 'aggregation'),
study_name=self.get_study_name(),
Expand Down Expand Up @@ -326,7 +298,7 @@ def run_variant_load_workflow(self):
return output_dir

def insert_browsable_files(self):
with self.get_pg_conn() as conn:
with get_metadata_conn() as conn:
# insert into browsable file table, if files not already there
files_query = f"select file_id, filename from evapro.browsable_file " \
f"where project_accession = '{self.project_accession}';"
Expand Down Expand Up @@ -363,7 +335,7 @@ def insert_browsable_files(self):
execute_query(conn, ftp_update)

def refresh_study_browser(self):
with self.get_pg_conn() as conn:
with get_metadata_conn() as conn:
execute_query(conn, 'refresh materialized view study_browser;')

@cached_property
Expand Down
35 changes: 34 additions & 1 deletion eva_submission/eload_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import glob
import os
from urllib.parse import urlsplit

from ebi_eva_common_pyutils.assembly import NCBIAssembly
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.config_utils import get_properties_from_xml_file
from ebi_eva_common_pyutils.logger import logging_config as log_cfg
import psycopg2
from pymongo.uri_parser import split_hosts

logger = log_cfg.get_logger(__name__)

Expand Down Expand Up @@ -46,4 +50,33 @@ def get_file_content(file_path):

def cast_list(l, type_to_cast=str):
for e in l:
yield type_to_cast(e)
yield type_to_cast(e)


def get_metadata_creds():
properties = get_properties_from_xml_file(cfg['maven']['environment'], cfg['maven']['settings_file'])
pg_url = properties['eva.evapro.jdbc.url']
pg_user = properties['eva.evapro.user']
pg_pass = properties['eva.evapro.password']
return pg_url, pg_user, pg_pass


def get_metadata_conn():
pg_url, pg_user, pg_pass = get_metadata_creds()
return psycopg2.connect(urlsplit(pg_url).path, user=pg_user, password=pg_pass)


def get_mongo_creds():
properties = get_properties_from_xml_file(cfg['maven']['environment'], cfg['maven']['settings_file'])
mongo_host = split_hosts(properties['eva.mongo.host'])[0][0]
mongo_user = properties['eva.mongo.user']
mongo_pass = properties['eva.mongo.passwd']
return mongo_host, mongo_user, mongo_pass


def get_accession_pg_creds():
properties = get_properties_from_xml_file(cfg['maven']['environment'], cfg['maven']['settings_file'])
pg_url = properties['eva.accession.jdbc.url']
pg_user = properties['eva.accession.user']
pg_pass = properties['eva.accession.password']
return pg_url, pg_user, pg_pass
1 change: 0 additions & 1 deletion eva_submission/eload_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def parse_vcf_check_report(self, vcf_check_report):
def _run_validation_workflow(self):
output_dir = self.create_nextflow_temp_output_directory()
validation_config = {
'metadata_file': self.eload_cfg.query('submission', 'metadata_spreadsheet'),
'vcf_files': self.eload_cfg.query('submission', 'vcf_files'),
'reference_fasta': self.eload_cfg.query('submission', 'assembly_fasta'),
'reference_report': self.eload_cfg.query('submission', 'assembly_report'),
Expand Down
Loading

0 comments on commit 37026ec

Please sign in to comment.