Skip to content

Commit

Permalink
added improved vcf generation and list download
Browse files Browse the repository at this point in the history
  • Loading branch information
MarvinDo committed Aug 23, 2024
1 parent b98a925 commit 72a7c76
Show file tree
Hide file tree
Showing 20 changed files with 689 additions and 111 deletions.
6 changes: 6 additions & 0 deletions src/annotation_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ def process_one_request(annotation_queue_id, job_config = get_default_job_config
conn.set_annotation_queue_status(annotation_queue_id, status="progress")
print("processing request " + str(annotation_queue_id) + " annotating variant: " + "-".join([variant.chrom, str(variant.pos), variant.ref, variant.alt]) + " with id: " + str(variant.id) )

# invalidate variant list download vcf files
list_ids = conn.get_list_ids_with_variant(variant_id)
for list_id in list_ids:
download_queue_ids = conn.get_valid_download_queue_ids(list_id, "list_download")
for download_queue_id in download_queue_ids:
conn.invalidate_download_queue(download_queue_id)

# write a vcf with the current variant to disc
vcf_path = get_temp_vcf_path(annotation_queue_id)
Expand Down
100 changes: 96 additions & 4 deletions src/common/db_IO.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,8 @@ def is_hgnc(self, string):
def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, user_id,
ranges = None, genes = None, consensus = None, user = None, automatic_splicing = None, automatic_protein = None,
hgvs = None, variant_ids_oi = None, external_ids = None, cdna_ranges = None, annotation_restrictions = None,
include_heredicare_consensus = False, variant_strings = None, variant_types = None):
include_heredicare_consensus = False, variant_strings = None, variant_types = None, clinvar_upload_states = None,
heredicare_upload_states = None):
# get one page of variants determined by offset & pagesize

prefix = "SELECT id, chr, pos, ref, alt FROM variant"
Expand Down Expand Up @@ -964,6 +965,30 @@ def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, use
actual_information += tuple(automatic_without_dash)
new_constraints = "id IN (" + new_constraints_inner + ")"
postfix = self.add_constraints_to_command(postfix, new_constraints)
if clinvar_upload_states is not None and len(clinvar_upload_states) > 0:
new_constraints_inner = """
SELECT variant_id FROM publish_clinvar_queue WHERE id IN (
SELECT MAX(id) FROM publish_clinvar_queue WHERE status != 'skipped' GROUP BY variant_id
) AND status IN
"""
placeholders = self.get_placeholders(len(clinvar_upload_states))
new_constraints_inner += placeholders
new_constraints_inner = functions.enbrace(new_constraints_inner)
new_constraints = "id IN " + new_constraints_inner
postfix = self.add_constraints_to_command(postfix, new_constraints)
actual_information += tuple(clinvar_upload_states)
if heredicare_upload_states is not None and len(heredicare_upload_states) > 0:
new_constraints_inner = """
SELECT DISTINCT variant_id FROM publish_heredicare_queue WHERE id IN (
SELECT MAX(id) FROM publish_heredicare_queue WHERE status != 'skipped' GROUP BY vid
) AND status IN
"""
placeholders = self.get_placeholders(len(heredicare_upload_states))
new_constraints_inner += placeholders
new_constraints_inner = functions.enbrace(new_constraints_inner)
new_constraints = "id IN " + new_constraints_inner
postfix = self.add_constraints_to_command(postfix, new_constraints)
actual_information += tuple(heredicare_upload_states)
if hgvs is not None and len(hgvs) > 0:
all_variants = []
for hgvs_string in hgvs:
Expand Down Expand Up @@ -1060,7 +1085,7 @@ def get_variants_page_merged(self, page, page_size, sort_by, include_hidden, use
offset = (page - 1) * page_size
command = command + " LIMIT %s, %s"
actual_information += (offset, page_size)
#print(command % actual_information)
print(command % actual_information)
self.cursor.execute(command, actual_information)
variants_raw = self.cursor.fetchall()

Expand Down Expand Up @@ -2214,6 +2239,12 @@ def get_variant_ids_from_list(self, list_id):
result = [str(x[0]) for x in result] # extract variant_id
return result

def get_list_ids_with_variant(self, variant_id):
command = "SELECT DISTINCT list_id FROM list_variants WHERE variant_id = %s"
self.cursor.execute(command, (variant_id, ))
result = self.cursor.fetchall()
return [x[0] for x in result]

# list_id to get the right list
# list_name is the value which will be updated
def update_user_variant_list(self, list_id, new_list_name, public_read, public_edit):
Expand Down Expand Up @@ -3323,7 +3354,8 @@ def clear_heredicare_annotation(self, variant_id):
self.conn.commit()

def get_enumtypes(self, tablename, columnname):
allowed_tablenames = ["consensus_classification", "user_classification", "variant", "annotation_queue", "automatic_classification", "sv_variant", "user_classification_criteria_applied", "consensus_classification_criteria_applied", "import_variant_queue"]
allowed_tablenames = ["consensus_classification", "user_classification", "variant", "annotation_queue", "automatic_classification", "sv_variant",
"user_classification_criteria_applied", "consensus_classification_criteria_applied", "import_variant_queue", "publish_heredicare_queue"]
if tablename in allowed_tablenames: # prevent sql injection
command = "SHOW COLUMNS FROM " + tablename + " WHERE FIELD = %s"
else:
Expand Down Expand Up @@ -3887,6 +3919,12 @@ def get_clinvar_queue_entries(self, publish_queue_ids: list, variant_id):
self.cursor.execute(command, actual_information)
result = self.cursor.fetchall()
return result

def get_unique_publish_clinvar_queue_status(self):
command = "SELECT DISTINCT status FROM publish_clinvar_queue WHERE status != 'skipped'"
self.cursor.execute(command)
result = self.cursor.fetchall()
return [x[0] for x in result]

# DEPRECATED: delete later
#def check_publish_queue_id(self, publish_queue_id):
Expand Down Expand Up @@ -4065,4 +4103,58 @@ def insert_classification_final_class(self, classification_scheme_id, final_clas
SELECT %s, %s FROM DUAL WHERE NOT EXISTS (SELECT * FROM classification_final_class
WHERE `classification_scheme_id`=%s AND `classification`=%s LIMIT 1)"""
self.cursor.execute(command, (classification_scheme_id, final_class, classification_scheme_id, final_class))
self.conn.commit()
self.conn.commit()




def insert_download_request(self, list_id, request_type, filename):
command = "INSERT INTO download_queue (identifier, type, filename) VALUES (%s, %s, %s)"
self.cursor.execute(command, (list_id, request_type, filename))
self.conn.commit()
return self.get_most_recent_download_queue_id(list_id, request_type)

def get_most_recent_download_queue_id(self, list_id, request_type):
command = "SELECT MAX(id) FROM download_queue WHERE identifier = %s AND type = %s AND is_valid = 1"
self.cursor.execute(command, (list_id, request_type))
result = self.cursor.fetchone()
return result[0]

def update_download_queue_celery_task_id(self, download_queue_id, celery_task_id):
command = "UPDATE download_queue SET celery_task_id = %s WHERE id = %s"
self.cursor.execute(command, (celery_task_id, download_queue_id))
self.conn.commit()

def close_download_queue(self, status, download_queue_id, message):
command = "UPDATE download_queue SET status = %s, finished_at = NOW(), message = %s WHERE id = %s"
self.cursor.execute(command, (status, message, download_queue_id))
self.conn.commit()

def update_download_queue_status(self, download_queue_id, status, message):
command = "UPDATE download_queue SET status = %s, message = %s WHERE id = %s"
self.cursor.execute(command, (status, message, download_queue_id))
self.conn.commit()

def get_download_queue(self, download_queue_id, minimal_info = False):
if download_queue_id is None:
return None
info = "requested_at, status, finished_at, message, is_valid, filename, identifier, type, celery_task_id"
if minimal_info:
info = "requested_at, status, finished_at, message, is_valid"
command = "SELECT " + info + " FROM download_queue WHERE id = %s"
self.cursor.execute(command, (download_queue_id, ))
result = self.cursor.fetchone()
return result

def get_valid_download_queue_ids(self, identifier, request_type):
command = "SELECT id FROM download_queue WHERE identifier = %s AND type = %s AND is_valid = 1"
self.cursor.execute(command, (identifier, request_type))
data = self.cursor.fetchall()
return [x[0] for x in data]

def invalidate_download_queue(self, download_queue_id):
# invalidate in db
command = "UPDATE download_queue SET is_valid = 0 WHERE id = %s"
self.cursor.execute(command, (download_queue_id, ))
self.conn.commit()

26 changes: 19 additions & 7 deletions src/common/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
import uuid
from functools import cmp_to_key
import pathlib
import werkzeug




def is_secure_filename(filename):
return werkzeug.utils.secure_filename(filename) == filename

def prettyprint_json(json_obj, func = print):
pretty_json = json.dumps(json_obj, indent=2)
Expand Down Expand Up @@ -168,11 +175,12 @@ def convert_none_infinite(x):
else:
return x

def execute_command(command, process_name, use_prefix_error_log = True):
completed_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = completed_process.communicate()#[1].strip().decode("utf-8") # catch errors and warnings and convert to str
def execute_command(command, process_name, use_prefix_error_log = True, stdout=subprocess.PIPE):
completed_process = subprocess.Popen(command, stdout=stdout, stderr=subprocess.PIPE)
command_output, std_err = completed_process.communicate()#[1].strip().decode("utf-8") # catch errors and warnings and convert to str
std_err = std_err.strip().decode("utf-8")
command_output = std_out.strip().decode("utf-8")
if command_output is not None:
command_output = command_output.strip().decode("utf-8")
err_msg = ""
if completed_process.returncode != 0:
if use_prefix_error_log:
Expand Down Expand Up @@ -672,9 +680,9 @@ def enpercent(string):
return '%' + string + '%'


def get_random_temp_file(fileending, filename_ext = ""):
def get_random_temp_file(fileending, filename_ext = "", folder = tempfile.gettempdir()):
filename = collect_info(str(uuid.uuid4()), "", filename_ext, sep = '_')
return os.path.join(tempfile.gettempdir(), filename + "." + str(fileending.strip('.'))).strip('.')
return os.path.join(folder, filename + "." + str(fileending.strip('.'))).strip('.')

def rm(path):
if os.path.exists(path):
Expand Down Expand Up @@ -970,4 +978,8 @@ def one_to_three_letter(s):
if s == "Y": return "Tyr"
if s == "V": return "Val"
if s == "X": return "Ter"
return "-"
return "-"




1 change: 1 addition & 0 deletions src/common/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def joinpaths(path, *paths):
resources_dir = joinpaths(workdir, 'resources')
logs_dir = joinpaths(workdir, 'logs')
downloads_dir = joinpaths(workdir, 'downloads')
download_variant_list_dir = joinpaths(downloads_dir, 'variant_lists')
# classified variants folders are calculated in tasks.py
#report_dir = joinpaths(workdir, "") #'downloads/consensus_classification_reports/'

Expand Down
93 changes: 68 additions & 25 deletions src/frontend_celery/webapp/download/download_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ def generate_consensus_only_vcf(variant_types, dummy = False):
variant_ids_oi = []
if not dummy:
variant_ids_oi = conn.get_variant_ids_with_consensus_classification(variant_types = variant_types)
vcf_file_buffer, status, vcf_errors, err_msg = get_vcf(variant_ids_oi, conn, get_variant_vcf_line_only_consensus, check_vcf=False)
if status == "error":
raise IOError("There was an error during vcf generation: " + str(vcf_errors) + "; " + err_msg)
functions.buffer_to_file_system(vcf_file_buffer, path_to_download)
status, message = write_vcf_file(variant_ids_oi, path_to_download, conn, get_variant_vcf_line_only_consensus, check_vcf = False)
conn.close()

if status == "error":
raise IOError("There was an error during vcf generation: " + str(message))

with open(last_dump_path, 'w') as last_dump_file:
last_dump_file.write(last_dump_date)

Expand Down Expand Up @@ -121,26 +120,70 @@ def get_vcf(variants_oi, conn, worker=get_variant_vcf_line, check_vcf=True):
return buffer, status, "", ""


#import time
#def test_large_download():
# for i in range(50):
# yield str(i).encode()
# print(i)
# time.sleep(1)
#
#
def get_vcf_stream(variants_oi, conn, worker=get_variant_vcf_line):
for id in variants_oi:
info_headers, variant_vcf = worker(id, conn)
yield variant_vcf
#all_variant_vcf_lines.append(variant_vcf)
#final_info_headers = merge_info_headers(final_info_headers, info_headers)

#printable_info_headers = list(final_info_headers.values())
#printable_info_headers.sort()
#functions.write_vcf_header(printable_info_headers, lambda l: yield_something(l.encode()), tail='\n')
def yield_something(val):
yield val

def write_vcf_file(variant_ids, path, conn: Connection, worker=get_variant_vcf_line, check_vcf = True):
status = "success"
message = ""

# write file in reverse to disc
generator = get_vcf_stream(variant_ids, conn, worker)
with open(path, "w") as file:
for line in generator:
file.write(line)

# reverse vcf
path2 = path + ".rev"
command = ["tac", path]
with open(path2, "w") as f:
returncode, serr, sout = functions.execute_command(command, "tac", stdout = f)
if returncode != 0:
status = "error"
message = functions.collect_info(message, "", serr)
message = functions.collect_info(message, "", sout)
functions.rm(path2)
return status, message
functions.rm(path)
returncode, err_msg, command_output = functions.execute_command(["mv", path2, path], "mv")
if returncode != 0: return "error", err_msg

# check vcf
if check_vcf:
returncode, err_msg, vcf_errors = functions.check_vcf(path)
if returncode != 0:
status = "error"
message = functions.collect_info(message, "", err_msg)
message = functions.collect_info(message, "", vcf_errors)
return status, message

return status, message


# this is a generator to get a vcf file in reverse order
def get_vcf_stream(variant_ids, conn, worker=get_variant_vcf_line):
final_info_headers = {}
for variant_id in variant_ids:
info_headers, variant_vcf = worker(variant_id, conn)
yield variant_vcf + "\n"
final_info_headers = merge_info_headers(final_info_headers, info_headers)

printable_info_headers = list(final_info_headers.values())
printable_info_headers.sort()
buffer = io.StringIO()
functions.write_vcf_header(printable_info_headers, lambda l: buffer.write(l), tail='\n')
buffer.seek(0)
for line in reversed(list(buffer)):
yield line


def stream_file(file_path, remove_after = False):
file = open(file_path, 'r')
yield from file
file.close()
if remove_after:
os.remove(file_path)






Expand Down
Loading

0 comments on commit 72a7c76

Please sign in to comment.