diff --git a/services.py b/services.py index e100dcf..f6da8e2 100644 --- a/services.py +++ b/services.py @@ -253,7 +253,7 @@ def get_score(row): def get_assertion_json(rows): - semmed_count = sum([row['semmed_flag'] for row in rows]) + # semmed_count = sum([row['semmed_flag'] for row in rows]) row1 = rows[0] supporting_publications = [] for row in rows: @@ -294,13 +294,13 @@ def get_assertion_json(rows): "attribute_source": "infores:pubmed" } ] - if semmed_count > 0: - attributes_list.append({ - "attribute_type_id": "biolink:semmed_agreement_count", - "value": semmed_count, - "value_type_id": "SIO:000794", - "attribute_source": "infores:text-mining-provider-targeted" - }) + # if semmed_count > 0: + # attributes_list.append({ + # "attribute_type_id": "biolink:semmed_agreement_count", + # "value": semmed_count, + # "value_type_id": "SIO:000794", + # "attribute_source": "infores:text-mining-provider-targeted" + # }) for row in rows: attributes_list.append(get_evidence_json(row)) return json.dumps(attributes_list) @@ -358,15 +358,15 @@ def get_evidence_json(row): "attribute_source": "infores:pubmed" } ) - if row['semmed_flag'] == 1: - nested_attributes.append( - { - "attribute_type_id": "biolink:agrees_with_data_source", - "value": "infores:semmeddb", - "value_type_id": "biolink:InformationResource", - "attribute_source": "infores:text-mining-provider-targeted" - } - ) + # if row['semmed_flag'] == 1: + # nested_attributes.append( + # { + # "attribute_type_id": "biolink:agrees_with_data_source", + # "value": "infores:semmeddb", + # "value_type_id": "biolink:InformationResource", + # "attribute_source": "infores:text-mining-provider-targeted" + # } + # ) return { "attribute_type_id": "biolink:has_supporting_study_result", "value": f"tmkp:{row['evidence_id']}", @@ -419,13 +419,15 @@ def get_edge(rows, predicate): object_aspect_qualifier = 'activity_or_abundance' object_direction_qualifier = 'decreased' elif predicate == 'biolink:gain_of_function_contributes_to': - predicate = 'biolink:affects' - qualified_predicate = 'biolink:contributes_to' - subject_form_or_variant_qualifier = 'gain_of_function_variant_form' + # predicate = 'biolink:affects' + # qualified_predicate = 'biolink:contributes_to' + # subject_form_or_variant_qualifier = 'gain_of_function_variant_form' + return None elif predicate == 'biolink:loss_of_function_contributes_to': - predicate = 'biolink:affects' - qualified_predicate = 'biolink:contributes_to' - subject_form_or_variant_qualifier = 'loss_of_function_variant_form' + # predicate = 'biolink:affects' + # qualified_predicate = 'biolink:contributes_to' + # subject_form_or_variant_qualifier = 'loss_of_function_variant_form' + return None return [sub, predicate, obj, qualified_predicate, subject_aspect_qualifier, subject_direction_qualifier, subject_part_qualifier, subject_form_or_variant_qualifier, @@ -458,6 +460,47 @@ def write_edges(edge_dict, nodes, output_filename): logging.info(f'{len(skipped_assertions)} distinct assertions were skipped') logging.info("Edge output complete") +def write_edges_gzip(edge_dict, nodes, output_filename): + logging.info("Starting edge output") + skipped_assertions = set([]) + with gzip.open(output_filename, 'ab') as outfile: + for assertion, rows in edge_dict.items(): + row1 = rows[0] + sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] + obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + if sub not in nodes or obj not in nodes: + continue + predicates = set([row['predicate_curie'] for row in rows]) + for predicate in predicates: + edge = get_edge(rows, predicate) + if not edge: + skipped_assertions.add(assertion) + continue + line = b'\t'.join(bytes(str(val), encoding='utf-8') for val in edge) + b'\n' + throwaway_value = outfile.write(line) + outfile.flush() + logging.info(f'{len(skipped_assertions)} distinct assertions were skipped') + logging.info("Edge output complete") + +def generate_edges(edge_dict, nodes): + logging.info("Starting edge output") + skipped_assertions = set([]) + for assertion, rows in edge_dict.items(): + row1 = rows[0] + sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] + obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + if sub not in nodes or obj not in nodes: + continue + predicates = set([row['predicate_curie'] for row in rows]) + for predicate in predicates: + edge = get_edge(rows, predicate) + if not edge: + skipped_assertions.add(assertion) + continue + yield '\t'.join(str(val) for val in edge) + '\n' + logging.info(f'{len(skipped_assertions)} distinct assertions were skipped') + logging.info("Edge output complete") + def compress(infile, outfile): with open(infile, 'rb') as textfile: diff --git a/targeted.py b/targeted.py index e1538d2..23030d1 100644 --- a/targeted.py +++ b/targeted.py @@ -75,7 +75,8 @@ def get_assertion_ids(session, limit=600000, offset=0): '(SELECT DISTINCT(assertion_id) ' 'FROM assertion_evidence_feedback af INNER JOIN evidence_feedback_answer ef INNER JOIN evidence e ' 'ON e.evidence_id = af.evidence_id ' - 'WHERE ef.prompt_text = \'Assertion Correct\' AND ef.response = 0) ' + 'INNER JOIN evidence_version ev ON ev.evidence_id = e.evidence_id ' + 'WHERE ef.prompt_text = \'Assertion Correct\' AND ef.response = 0 AND ev.version = 2) ' 'AND subject_curie NOT IN :ex1 AND object_curie NOT IN :ex2 ' 'ORDER BY assertion_id ' 'LIMIT :limit OFFSET :offset' @@ -98,14 +99,12 @@ def get_edge_data(session: Session, id_list, chunk_size=1000, edge_limit=5) -> l 'si.idf AS subject_idf, oi.idf AS object_idf, ' 'e.document_id, e.document_zone, e.document_year, e.score, ' 'e.sentence, e.subject_span, e.subject_text, e.object_span, e.object_text, ' - '(SELECT COUNT(1) FROM top_unique_evidences t2 ' - 'WHERE t2.assertion_id = a.assertion_id AND t2.predicate_curie = e.predicate_curie) AS evidence_count, ' - 'IF(e.tm_id IS NULL, 0, 1) AS semmed_flag ' + '(SELECT COUNT(1) FROM top_evidences t2 ' + 'WHERE t2.assertion_id = a.assertion_id AND t2.predicate_curie = e.predicate_curie) AS evidence_count ' 'FROM assertion a ' 'INNER JOIN LATERAL ' - '(SELECT * FROM top_unique_evidences te LEFT JOIN tm_semmed ts ON ts.tm_id = te.evidence_id ' - f'WHERE te.assertion_id = a.assertion_id ORDER BY ts.semmed_id IS NULL LIMIT {edge_limit}) AS e ' - 'ON a.assertion_id = e.assertion_id ' + f'(SELECT * FROM top_evidences te WHERE te.assertion_id = a.assertion_id LIMIT {edge_limit}) AS e ' + f'ON a.assertion_id = e.assertion_id ' f'LEFT JOIN pr_to_uniprot su ON a.subject_curie = su.pr AND su.taxon = "{HUMAN_TAXON}" ' f'LEFT JOIN pr_to_uniprot ou ON a.object_curie = ou.pr AND ou.taxon = "{HUMAN_TAXON}" ' 'LEFT JOIN concept_idf si ON a.subject_curie = si.concept_curie ' @@ -119,6 +118,77 @@ def get_edge_data(session: Session, id_list, chunk_size=1000, edge_limit=5) -> l yield [row for row in session.execute(main_query, {'ids': id_list[i:slice_end]})] +def copy_assertion_records(session: Session) -> int: + logging.info('Starting assertion record copy') + session.execute(text('DELETE FROM targeted.assertion_2')) + logging.debug('Assertion records deleted') + insert_statement = text( + 'INSERT INTO targeted.assertion_2 (assertion_id, subject_curie, object_curie, association_curie) ' + 'SELECT a.assertion_id ' + ',IFNULL(u1.uniprot, a.subject_curie) AS subject_curie ' + ',IFNULL(u2.uniprot, a.object_curie) AS object_curie ' + ',a.association_curie ' + 'FROM assertion a ' + 'LEFT JOIN pr_to_uniprot u1 ON u1.pr = a.subject_curie AND u1.taxon = "NCBITaxon:9606" ' + 'LEFT JOIN pr_to_uniprot u2 ON u2.pr = a.object_curie AND u2.taxon = "NCBITaxon:9606" ' + ) + session.execute(insert_statement) + logging.debug('Insert statement executed, starting commit') + session.commit() + logging.debug('Insert complete') + value, = session.execute(text('SELECT COUNT(1) FROM targeted.assertion_2')).one() + # logging.info(f'{value} assertion records') + return value + +def copy_evidence_records(session: Session) -> int: + logging.info('Starting evidence record copy') + session.execute(text('DELETE FROM targeted.evidence_2')) + logging.debug('Evidence records deleted') + insert_statement = text( + 'INSERT INTO targeted.evidence_2 (evidence_id, assertion_id, document_id, sentence, document_zone,' + ' document_publication_type, subject_span, subject_covered_text, object_span, object_covered_text,' + ' document_year_published, predicate_curie, score) ' + 'SELECT ' + ' `e`.`evidence_id` AS `evidence_id` ' + ',`e`.`assertion_id` AS `assertion_id` ' + ',`e`.`document_id` AS `document_id` ' + ',`e`.`sentence` AS `sentence` ' + ',`e`.`document_zone` AS `document_zone` ' + ',`e`.`document_publication_type` AS `document_publication_type` ' + ',`se`.`span` AS `subject_span` ' + ',`se`.`covered_text` AS `subject_covered_text` ' + ',`oe`.`span` AS `object_span` ' + ',`oe`.`covered_text` AS `object_covered_text` ' + ',IFNULL(`dy`.`year`, `e`.`document_year_published`) AS `document_year_published` ' + ',`t`.`predicate_curie` AS `predicate_curie` ' + ',`t`.`score` AS `score` ' + 'FROM (((((`evidence` `e` ' + 'join `evidence_version` `ev` on((`e`.`evidence_id` = `ev`.`evidence_id`))) ' + 'join `entity` `se` on((`e`.`subject_entity_id` = `se`.`entity_id`))) ' + 'join `entity` `oe` on((`e`.`object_entity_id` = `oe`.`entity_id`))) ' + 'join `top_evidence_scores` `t` on((`e`.`evidence_id` = `t`.`evidence_id`))) ' + 'join `document_year` `dy` on((`e`.`document_id` = `dy`.`document_id`))) ' + 'WHERE ((`t`.`predicate_curie` <> "false") ' + 'and `e`.`superseded_by` IS NULL ' + 'and (`ev`.`version` = (SELECT MAX(version) FROM evidence_version)) ' + 'and `e`.`evidence_id` in ( ' + 'select `assertion_evidence_feedback`.`evidence_id` ' + 'from (`assertion_evidence_feedback` ' + 'join `evidence_feedback_answer` ' + 'on((`evidence_feedback_answer`.`feedback_id` = `assertion_evidence_feedback`.`id`))) ' + 'where ((`evidence_feedback_answer`.`prompt_text` = "Assertion Correct") ' + 'and (`evidence_feedback_answer`.`response` = 0)) ' + ') is false)' + ) + session.execute(insert_statement) + logging.debug('Insert statement executed, starting commit') + session.commit() + logging.debug('Insert complete') + value, = session.execute(text('SELECT COUNT(1) FROM targeted.evidence_2')).one() + # logging.info(value) + return value + + def get_superseded_chunk(session: Session) -> list[tuple[str, str]]: query_text = text(""" SELECT e1.evidence_id, e2.document_id @@ -179,6 +249,29 @@ def create_edge_dict(edge_data): return edge_dict +def uniquify_edge_dict(edge_dict): + for assertion_id in edge_dict.keys(): + evidence_list = edge_dict[assertion_id] + if len(evidence_list) < 2: + continue + index_dict = {} + score_dict = {} + for ev in evidence_list: + composite_key = f"{ev['assertion_id']}_{ev['evidence_id']}_{ev['document_id']}_{ev['sentence']}" + score = float(ev['score']) + if composite_key in index_dict.keys() and composite_key in score_dict.keys(): + if score > score_dict[composite_key]: + index_dict[composite_key] = evidence_list.index(ev) + score_dict[composite_key] = score + else: + index_dict[composite_key] = evidence_list.index(ev) + score_dict[composite_key] = score + new_evidence_list = [] + for index in index_dict.values(): + new_evidence_list.append(evidence_list[index]) + edge_dict[assertion_id] = new_evidence_list + + def export_nodes(session: Session, bucket: str, blob_prefix: str): (node_curies, normal_dict) = get_node_data(session, use_uniprot=True) node_metadata = write_nodes(node_curies, normal_dict, 'nodes.tsv.gz') @@ -205,5 +298,13 @@ def export_edges(session: Session, nodes: set, bucket: str, blob_prefix: str, for rows in get_edge_data(session, id_list, chunk_size, edge_limit): logging.info(f'Processing the next {len(rows)} rows') edge_dict = create_edge_dict(rows) + uniquify_edge_dict(edge_dict) services.write_edges(edge_dict, nodes, output_filename) services.upload_to_gcp(bucket, output_filename, f'{blob_prefix}{output_filename}') + + +def export_edges_new(session: Session): + assertion_count = copy_assertion_records(session) + evidence_count = copy_evidence_records(session) + logging.info(f'Inserted {assertion_count} assertion records and {evidence_count} evidence records') +