diff --git a/services.py b/services.py index f6da8e2..a02e25b 100644 --- a/services.py +++ b/services.py @@ -349,11 +349,11 @@ def get_evidence_json(row): "attribute_source": "infores:text-mining-provider-targeted " } ] - if row['document_year']: + if row['document_year_published']: nested_attributes.append( { "attribute_type_id": "biolink:supporting_document_year", - "value": row['document_year'], + "value": row['document_year_published'], "value_type_id": "UO:0000036", "attribute_source": "infores:pubmed" } @@ -383,12 +383,17 @@ def get_edge(rows, predicate): logging.debug(f'No relevant rows for predicate {predicate}') return None row1 = relevant_rows[0] - if (row1['object_curie'].startswith('PR:') and not row1['object_uniprot']) or \ - (row1['subject_curie'].startswith('PR:') and not row1['subject_uniprot']): + if row1['object_curie'].startswith('PR:') or row1['subject_curie'].startswith('PR:'): logging.debug(f"Could not get uniprot for pr curie ({row1['object_curie']}|{row1['subject_curie']})") return None - sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] - obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + sub = row1['subject_curie'] + obj = row1['object_curie'] + # if (row1['object_curie'].startswith('PR:') and not row1['object_uniprot']) or \ + # (row1['subject_curie'].startswith('PR:') and not row1['subject_uniprot']): + # logging.debug(f"Could not get uniprot for pr curie ({row1['object_curie']}|{row1['subject_curie']})") + # return None + # sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] + # obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] supporting_study_results = '|'.join([f"tmkp:{row['evidence_id']}" for row in relevant_rows]) supporting_publications = [] for row in relevant_rows: @@ -444,8 +449,10 @@ def write_edges(edge_dict, nodes, output_filename): with open(output_filename, 'a') as outfile: for assertion, rows in edge_dict.items(): row1 = rows[0] - sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] - obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + # sub = row1['subject_uniprot'] if row1['subject_uniprot'] else row1['subject_curie'] + # obj = row1['object_uniprot'] if row1['object_uniprot'] else row1['object_curie'] + sub = row1['subject_curie'] + obj = row1['object_curie'] if sub not in nodes or obj not in nodes: continue predicates = set([row['predicate_curie'] for row in rows]) @@ -507,6 +514,7 @@ def compress(infile, outfile): with gzip.open(outfile, 'wb') as gzfile: shutil.copyfileobj(textfile, gzfile) + def decompress(infile, outfile): with gzip.open(infile, 'rb') as gzfile: with open(outfile, 'wb') as textfile: diff --git a/targeted.py b/targeted.py index 23030d1..74f7b6f 100644 --- a/targeted.py +++ b/targeted.py @@ -5,9 +5,11 @@ import sqlalchemy from sqlalchemy import text from sqlalchemy.orm import Session +from sqlalchemy import Column, String, Integer +from sqlalchemy.orm import declarative_base -from evidence import Evidence import services +Model = declarative_base(name='Model') ROW_BATCH_SIZE = 10000 HUMAN_TAXON = 'NCBITaxon:9606' @@ -16,6 +18,33 @@ 'PR:Q04746', 'PR:Q04746', 'PR:Q7XZU3'] +class Evidence(Model): + __tablename__ = 'evidence' + evidence_id = Column(String(65), primary_key=True) + assertion_id = Column(String(65)) + document_id = Column(String(45)) + sentence = Column(String(2000)) + subject_entity_id = Column(String(65)) + object_entity_id = Column(String(65)) + document_zone = Column(String(45)) + document_publication_type = Column(String(100)) + document_year_published = Column(Integer) + superseded_by = Column(String(20)) + + def __init__(self, evidence_id, assertion_id, document_id, sentence, subject_entity_id, object_entity_id, + document_zone, document_publication_type, document_year_published, superseded_by): + self.evidence_id = evidence_id + self.assertion_id = assertion_id + self.document_id = document_id + self.sentence = sentence + self.subject_entity_id = subject_entity_id + self.object_entity_id = object_entity_id + self.document_zone = document_zone + self.document_publication_type = document_publication_type + self.document_year_published = document_year_published + self.superseded_by = superseded_by + + def get_node_data(session: Session, use_uniprot: bool = False) -> (list[str], dict[str, dict]): """ Get the subject and object curies from assertions, uniquifies the list, @@ -27,18 +56,8 @@ def get_node_data(session: Session, use_uniprot: bool = False) -> (list[str], di """ logging.info("Getting node data") logging.info(f"Mode: {'UniProt' if use_uniprot else 'PR'}") - if use_uniprot: - curies = [row[0] for row in session.query(sqlalchemy.text('DISTINCT IFNULL(uniprot, subject_curie) as curie ' - 'FROM assertion LEFT JOIN pr_to_uniprot ON ' - 'subject_curie = pr AND ' - f'taxon = "{HUMAN_TAXON}"')).all()] - curies.extend([row[0] for row in session.query(sqlalchemy.text('DISTINCT IFNULL(uniprot, object_curie) as ' - 'curie FROM assertion LEFT JOIN pr_to_uniprot ' - f'ON object_curie = pr AND ' - f'taxon = "{HUMAN_TAXON}"')).all()]) - else: - curies = [row[0] for row in session.query(sqlalchemy.text('DISTINCT subject_curie FROM assertion')).all()] - curies.extend([row[0] for row in session.query(sqlalchemy.text('DISTINCT object_curie FROM assertion')).all()]) + curies = [row[0] for row in session.query(text('DISTINCT subject_curie FROM targeted.assertion')).all()] + curies.extend([row[0] for row in session.query(text('DISTINCT object_curie FROM targeted.assertion')).all()]) curies = list(set(curies)) logging.info(f'node curies retrieved and uniquified ({len(curies)})') curies = [curie for curie in curies if curie not in EXCLUDED_FIG_CURIES] @@ -71,10 +90,11 @@ def write_nodes(curies: list[str], normalize_dict: dict[str, dict], output_filen def get_assertion_ids(session, limit=600000, offset=0): - id_query = text('SELECT assertion_id FROM assertion WHERE assertion_id NOT IN ' + id_query = text('SELECT assertion_id FROM targeted.assertion WHERE assertion_id NOT IN ' '(SELECT DISTINCT(assertion_id) ' - 'FROM assertion_evidence_feedback af INNER JOIN evidence_feedback_answer ef INNER JOIN evidence e ' - 'ON e.evidence_id = af.evidence_id ' + 'FROM assertion_evidence_feedback af ' + 'INNER JOIN evidence_feedback_answer ef ' + 'INNER JOIN evidence e ON e.evidence_id = af.evidence_id ' 'INNER JOIN evidence_version ev ON ev.evidence_id = e.evidence_id ' 'WHERE ef.prompt_text = \'Assertion Correct\' AND ef.response = 0 AND ev.version = 2) ' 'AND subject_curie NOT IN :ex1 AND object_curie NOT IN :ex2 ' @@ -95,21 +115,18 @@ def get_edge_data(session: Session, id_list, chunk_size=1000, edge_limit=5) -> l logging.info(f'Partition count: {math.ceil(len(id_list) / chunk_size)}') main_query = text( 'SELECT a.assertion_id, e.evidence_id, a.association_curie, e.predicate_curie, ' - 'a.subject_curie, su.uniprot AS subject_uniprot, a.object_curie, ou.uniprot AS object_uniprot, ' + 'a.subject_curie, a.object_curie, ' 'si.idf AS subject_idf, oi.idf AS object_idf, ' - 'e.document_id, e.document_zone, e.document_year, e.score, ' - 'e.sentence, e.subject_span, e.subject_text, e.object_span, e.object_text, ' - '(SELECT COUNT(1) FROM top_evidences t2 ' + 'e.document_id, e.document_zone, e.document_year_published, e.score, ' + 'e.sentence, e.subject_span, e.subject_covered_text, e.object_span, e.object_covered_text, ' + '(SELECT COUNT(1) FROM targeted.evidence t2 ' 'WHERE t2.assertion_id = a.assertion_id AND t2.predicate_curie = e.predicate_curie) AS evidence_count ' - 'FROM assertion a ' - 'INNER JOIN LATERAL ' - f'(SELECT * FROM top_evidences te WHERE te.assertion_id = a.assertion_id LIMIT {edge_limit}) AS e ' - f'ON a.assertion_id = e.assertion_id ' - f'LEFT JOIN pr_to_uniprot su ON a.subject_curie = su.pr AND su.taxon = "{HUMAN_TAXON}" ' - f'LEFT JOIN pr_to_uniprot ou ON a.object_curie = ou.pr AND ou.taxon = "{HUMAN_TAXON}" ' + 'FROM targeted.assertion a INNER JOIN LATERAL ' + '(SELECT * FROM targeted.evidence te WHERE te.assertion_id = a.assertion_id AND te.document_zone <> \'REF\' ' + f'ORDER BY te.score DESC LIMIT {edge_limit}) AS e ON a.assertion_id = e.assertion_id ' 'LEFT JOIN concept_idf si ON a.subject_curie = si.concept_curie ' 'LEFT JOIN concept_idf oi ON a.object_curie = oi.concept_curie ' - 'WHERE a.assertion_id IN :ids AND e.document_zone <> "REF" AND e.superseded_by IS NULL ' + 'WHERE a.assertion_id IN :ids ' 'ORDER BY a.assertion_id' ) for i in range(0, len(id_list), chunk_size): @@ -190,6 +207,7 @@ def copy_evidence_records(session: Session) -> int: def get_superseded_chunk(session: Session) -> list[tuple[str, str]]: + logging.info("get_superseded_chunk") query_text = text(""" SELECT e1.evidence_id, e2.document_id FROM assertion a1 @@ -208,7 +226,7 @@ def get_superseded_chunk(session: Session) -> list[tuple[str, str]]: AND a1.subject_curie = a2.subject_curie AND a1.object_curie = a2.object_curie AND es1.predicate_curie = es2.predicate_curie - LIMIT 1000 + LIMIT 10000 """) eids = set([]) ids_list = [] @@ -273,6 +291,7 @@ def uniquify_edge_dict(edge_dict): def export_nodes(session: Session, bucket: str, blob_prefix: str): + logging.info("Exporting Nodes") (node_curies, normal_dict) = get_node_data(session, use_uniprot=True) node_metadata = write_nodes(node_curies, normal_dict, 'nodes.tsv.gz') services.upload_to_gcp(bucket, 'nodes.tsv.gz', f'{blob_prefix}nodes.tsv.gz')