Skip to content

Commit

Permalink
Exporting from targeted database tables
Browse files Browse the repository at this point in the history
  • Loading branch information
edgargaticaCU committed Mar 15, 2024
1 parent c1ec9aa commit fe92ae2
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 30 deletions.
89 changes: 66 additions & 23 deletions services.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def get_score(row):


def get_assertion_json(rows):
semmed_count = sum([row['semmed_flag'] for row in rows])
# semmed_count = sum([row['semmed_flag'] for row in rows])
row1 = rows[0]
supporting_publications = []
for row in rows:
Expand Down Expand Up @@ -294,13 +294,13 @@ def get_assertion_json(rows):
"attribute_source": "infores:pubmed"
}
]
if semmed_count > 0:
attributes_list.append({
"attribute_type_id": "biolink:semmed_agreement_count",
"value": semmed_count,
"value_type_id": "SIO:000794",
"attribute_source": "infores:text-mining-provider-targeted"
})
# if semmed_count > 0:
# attributes_list.append({
# "attribute_type_id": "biolink:semmed_agreement_count",
# "value": semmed_count,
# "value_type_id": "SIO:000794",
# "attribute_source": "infores:text-mining-provider-targeted"
# })
for row in rows:
attributes_list.append(get_evidence_json(row))
return json.dumps(attributes_list)
Expand Down Expand Up @@ -358,15 +358,15 @@ def get_evidence_json(row):
"attribute_source": "infores:pubmed"
}
)
if row['semmed_flag'] == 1:
nested_attributes.append(
{
"attribute_type_id": "biolink:agrees_with_data_source",
"value": "infores:semmeddb",
"value_type_id": "biolink:InformationResource",
"attribute_source": "infores:text-mining-provider-targeted"
}
)
# if row['semmed_flag'] == 1:
# nested_attributes.append(
# {
# "attribute_type_id": "biolink:agrees_with_data_source",
# "value": "infores:semmeddb",
# "value_type_id": "biolink:InformationResource",
# "attribute_source": "infores:text-mining-provider-targeted"
# }
# )
return {
"attribute_type_id": "biolink:has_supporting_study_result",
"value": f"tmkp:{row['evidence_id']}",
Expand Down Expand Up @@ -419,13 +419,15 @@ def get_edge(rows, predicate):
object_aspect_qualifier = 'activity_or_abundance'
object_direction_qualifier = 'decreased'
elif predicate == 'biolink:gain_of_function_contributes_to':
predicate = 'biolink:affects'
qualified_predicate = 'biolink:contributes_to'
subject_form_or_variant_qualifier = 'gain_of_function_variant_form'
# predicate = 'biolink:affects'
# qualified_predicate = 'biolink:contributes_to'
# subject_form_or_variant_qualifier = 'gain_of_function_variant_form'
return None
elif predicate == 'biolink:loss_of_function_contributes_to':
predicate = 'biolink:affects'
qualified_predicate = 'biolink:contributes_to'
subject_form_or_variant_qualifier = 'loss_of_function_variant_form'
# predicate = 'biolink:affects'
# qualified_predicate = 'biolink:contributes_to'
# subject_form_or_variant_qualifier = 'loss_of_function_variant_form'
return None
return [sub, predicate, obj, qualified_predicate,
subject_aspect_qualifier, subject_direction_qualifier,
subject_part_qualifier, subject_form_or_variant_qualifier,
Expand Down Expand Up @@ -458,6 +460,47 @@ def write_edges(edge_dict, nodes, output_filename):
logging.info(f'{len(skipped_assertions)} distinct assertions were skipped')
logging.info("Edge output complete")

def write_edges_gzip(edge_dict, nodes, output_filename):
logging.info("Starting edge output")
skipped_assertions = set([])
with gzip.open(output_filename, 'ab') as outfile:
for assertion, rows in edge_dict.items():
row1 = rows[0]
sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie']
obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie']
if sub not in nodes or obj not in nodes:
continue
predicates = set([row['predicate_curie'] for row in rows])
for predicate in predicates:
edge = get_edge(rows, predicate)
if not edge:
skipped_assertions.add(assertion)
continue
line = b'\t'.join(bytes(str(val), encoding='utf-8') for val in edge) + b'\n'
throwaway_value = outfile.write(line)
outfile.flush()
logging.info(f'{len(skipped_assertions)} distinct assertions were skipped')
logging.info("Edge output complete")

def generate_edges(edge_dict, nodes):
logging.info("Starting edge output")
skipped_assertions = set([])
for assertion, rows in edge_dict.items():
row1 = rows[0]
sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie']
obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie']
if sub not in nodes or obj not in nodes:
continue
predicates = set([row['predicate_curie'] for row in rows])
for predicate in predicates:
edge = get_edge(rows, predicate)
if not edge:
skipped_assertions.add(assertion)
continue
yield '\t'.join(str(val) for val in edge) + '\n'
logging.info(f'{len(skipped_assertions)} distinct assertions were skipped')
logging.info("Edge output complete")


def compress(infile, outfile):
with open(infile, 'rb') as textfile:
Expand Down
115 changes: 108 additions & 7 deletions targeted.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def get_assertion_ids(session, limit=600000, offset=0):
'(SELECT DISTINCT(assertion_id) '
'FROM assertion_evidence_feedback af INNER JOIN evidence_feedback_answer ef INNER JOIN evidence e '
'ON e.evidence_id = af.evidence_id '
'WHERE ef.prompt_text = \'Assertion Correct\' AND ef.response = 0) '
'INNER JOIN evidence_version ev ON ev.evidence_id = e.evidence_id '
'WHERE ef.prompt_text = \'Assertion Correct\' AND ef.response = 0 AND ev.version = 2) '
'AND subject_curie NOT IN :ex1 AND object_curie NOT IN :ex2 '
'ORDER BY assertion_id '
'LIMIT :limit OFFSET :offset'
Expand All @@ -98,14 +99,12 @@ def get_edge_data(session: Session, id_list, chunk_size=1000, edge_limit=5) -> l
'si.idf AS subject_idf, oi.idf AS object_idf, '
'e.document_id, e.document_zone, e.document_year, e.score, '
'e.sentence, e.subject_span, e.subject_text, e.object_span, e.object_text, '
'(SELECT COUNT(1) FROM top_unique_evidences t2 '
'WHERE t2.assertion_id = a.assertion_id AND t2.predicate_curie = e.predicate_curie) AS evidence_count, '
'IF(e.tm_id IS NULL, 0, 1) AS semmed_flag '
'(SELECT COUNT(1) FROM top_evidences t2 '
'WHERE t2.assertion_id = a.assertion_id AND t2.predicate_curie = e.predicate_curie) AS evidence_count '
'FROM assertion a '
'INNER JOIN LATERAL '
'(SELECT * FROM top_unique_evidences te LEFT JOIN tm_semmed ts ON ts.tm_id = te.evidence_id '
f'WHERE te.assertion_id = a.assertion_id ORDER BY ts.semmed_id IS NULL LIMIT {edge_limit}) AS e '
'ON a.assertion_id = e.assertion_id '
f'(SELECT * FROM top_evidences te WHERE te.assertion_id = a.assertion_id LIMIT {edge_limit}) AS e '
f'ON a.assertion_id = e.assertion_id '
f'LEFT JOIN pr_to_uniprot su ON a.subject_curie = su.pr AND su.taxon = "{HUMAN_TAXON}" '
f'LEFT JOIN pr_to_uniprot ou ON a.object_curie = ou.pr AND ou.taxon = "{HUMAN_TAXON}" '
'LEFT JOIN concept_idf si ON a.subject_curie = si.concept_curie '
Expand All @@ -119,6 +118,77 @@ def get_edge_data(session: Session, id_list, chunk_size=1000, edge_limit=5) -> l
yield [row for row in session.execute(main_query, {'ids': id_list[i:slice_end]})]


def copy_assertion_records(session: Session) -> int:
logging.info('Starting assertion record copy')
session.execute(text('DELETE FROM targeted.assertion_2'))
logging.debug('Assertion records deleted')
insert_statement = text(
'INSERT INTO targeted.assertion_2 (assertion_id, subject_curie, object_curie, association_curie) '
'SELECT a.assertion_id '
',IFNULL(u1.uniprot, a.subject_curie) AS subject_curie '
',IFNULL(u2.uniprot, a.object_curie) AS object_curie '
',a.association_curie '
'FROM assertion a '
'LEFT JOIN pr_to_uniprot u1 ON u1.pr = a.subject_curie AND u1.taxon = "NCBITaxon:9606" '
'LEFT JOIN pr_to_uniprot u2 ON u2.pr = a.object_curie AND u2.taxon = "NCBITaxon:9606" '
)
session.execute(insert_statement)
logging.debug('Insert statement executed, starting commit')
session.commit()
logging.debug('Insert complete')
value, = session.execute(text('SELECT COUNT(1) FROM targeted.assertion_2')).one()
# logging.info(f'{value} assertion records')
return value

def copy_evidence_records(session: Session) -> int:
logging.info('Starting evidence record copy')
session.execute(text('DELETE FROM targeted.evidence_2'))
logging.debug('Evidence records deleted')
insert_statement = text(
'INSERT INTO targeted.evidence_2 (evidence_id, assertion_id, document_id, sentence, document_zone,'
' document_publication_type, subject_span, subject_covered_text, object_span, object_covered_text,'
' document_year_published, predicate_curie, score) '
'SELECT '
' `e`.`evidence_id` AS `evidence_id` '
',`e`.`assertion_id` AS `assertion_id` '
',`e`.`document_id` AS `document_id` '
',`e`.`sentence` AS `sentence` '
',`e`.`document_zone` AS `document_zone` '
',`e`.`document_publication_type` AS `document_publication_type` '
',`se`.`span` AS `subject_span` '
',`se`.`covered_text` AS `subject_covered_text` '
',`oe`.`span` AS `object_span` '
',`oe`.`covered_text` AS `object_covered_text` '
',IFNULL(`dy`.`year`, `e`.`document_year_published`) AS `document_year_published` '
',`t`.`predicate_curie` AS `predicate_curie` '
',`t`.`score` AS `score` '
'FROM (((((`evidence` `e` '
'join `evidence_version` `ev` on((`e`.`evidence_id` = `ev`.`evidence_id`))) '
'join `entity` `se` on((`e`.`subject_entity_id` = `se`.`entity_id`))) '
'join `entity` `oe` on((`e`.`object_entity_id` = `oe`.`entity_id`))) '
'join `top_evidence_scores` `t` on((`e`.`evidence_id` = `t`.`evidence_id`))) '
'join `document_year` `dy` on((`e`.`document_id` = `dy`.`document_id`))) '
'WHERE ((`t`.`predicate_curie` <> "false") '
'and `e`.`superseded_by` IS NULL '
'and (`ev`.`version` = (SELECT MAX(version) FROM evidence_version)) '
'and `e`.`evidence_id` in ( '
'select `assertion_evidence_feedback`.`evidence_id` '
'from (`assertion_evidence_feedback` '
'join `evidence_feedback_answer` '
'on((`evidence_feedback_answer`.`feedback_id` = `assertion_evidence_feedback`.`id`))) '
'where ((`evidence_feedback_answer`.`prompt_text` = "Assertion Correct") '
'and (`evidence_feedback_answer`.`response` = 0)) '
') is false)'
)
session.execute(insert_statement)
logging.debug('Insert statement executed, starting commit')
session.commit()
logging.debug('Insert complete')
value, = session.execute(text('SELECT COUNT(1) FROM targeted.evidence_2')).one()
# logging.info(value)
return value


def get_superseded_chunk(session: Session) -> list[tuple[str, str]]:
query_text = text("""
SELECT e1.evidence_id, e2.document_id
Expand Down Expand Up @@ -179,6 +249,29 @@ def create_edge_dict(edge_data):
return edge_dict


def uniquify_edge_dict(edge_dict):
for assertion_id in edge_dict.keys():
evidence_list = edge_dict[assertion_id]
if len(evidence_list) < 2:
continue
index_dict = {}
score_dict = {}
for ev in evidence_list:
composite_key = f"{ev['assertion_id']}_{ev['evidence_id']}_{ev['document_id']}_{ev['sentence']}"
score = float(ev['score'])
if composite_key in index_dict.keys() and composite_key in score_dict.keys():
if score > score_dict[composite_key]:
index_dict[composite_key] = evidence_list.index(ev)
score_dict[composite_key] = score
else:
index_dict[composite_key] = evidence_list.index(ev)
score_dict[composite_key] = score
new_evidence_list = []
for index in index_dict.values():
new_evidence_list.append(evidence_list[index])
edge_dict[assertion_id] = new_evidence_list


def export_nodes(session: Session, bucket: str, blob_prefix: str):
(node_curies, normal_dict) = get_node_data(session, use_uniprot=True)
node_metadata = write_nodes(node_curies, normal_dict, 'nodes.tsv.gz')
Expand All @@ -205,5 +298,13 @@ def export_edges(session: Session, nodes: set, bucket: str, blob_prefix: str,
for rows in get_edge_data(session, id_list, chunk_size, edge_limit):
logging.info(f'Processing the next {len(rows)} rows')
edge_dict = create_edge_dict(rows)
uniquify_edge_dict(edge_dict)
services.write_edges(edge_dict, nodes, output_filename)
services.upload_to_gcp(bucket, output_filename, f'{blob_prefix}{output_filename}')


def export_edges_new(session: Session):
assertion_count = copy_assertion_records(session)
evidence_count = copy_evidence_records(session)
logging.info(f'Inserted {assertion_count} assertion records and {evidence_count} evidence records')

0 comments on commit fe92ae2

Please sign in to comment.