From 565d782f000c68701e69adcbe648e6c56a5134d5 Mon Sep 17 00:00:00 2001 From: john681611 Date: Mon, 16 Oct 2023 16:23:17 +0100 Subject: [PATCH] Refactor get_CREs --- application/database/db.py | 126 ++++------ application/tests/db_test.py | 447 +++++++++++++++++------------------ 2 files changed, 262 insertions(+), 311 deletions(-) diff --git a/application/database/db.py b/application/database/db.py index 3a6685275..807b249bb 100644 --- a/application/database/db.py +++ b/application/database/db.py @@ -201,7 +201,12 @@ def to_cre_def(self, node, parse_links=True): def get_links(self, links_dict): links = [] for key in links_dict: - links.extend([cre_defs.Link(c.to_cre_def(c, parse_links=False), key) for c in links_dict[key]]) + links.extend( + [ + cre_defs.Link(c.to_cre_def(c, parse_links=False), key) + for c in links_dict[key] + ] + ) return links @@ -265,7 +270,7 @@ def to_cre_def(self, node, parse_links=True) -> cre_defs.Code: class NeoCRE(NeoDocument): # type: ignore external_id = StringProperty() - contained_in = RelationshipFrom('NeoCRE', 'CONTAINS', model=ContainsRel) + contained_in = RelationshipFrom("NeoCRE", "CONTAINS", model=ContainsRel) contains = RelationshipTo("NeoCRE", "CONTAINS", model=ContainsRel) linked = RelationshipTo("NeoStandard", "LINKED_TO", model=LinkedToRel) same_as = RelationshipTo("NeoStandard", "SAME", model=SameRel) @@ -277,12 +282,16 @@ def to_cre_def(self, node, parse_links=True) -> cre_defs.CRE: id=node.external_id, description=node.description, tags=node.tags, - links=self.get_links({ - 'Contains': [*node.contains, *node.contained_in], - 'Linked To': node.linked, - 'Same as': node.same_as, - 'Related': node.related - }) if parse_links else [] + links=self.get_links( + { + "Contains": [*node.contains, *node.contained_in], + "Linked To": node.linked, + "Same as": node.same_as, + "Related": node.related, + } + ) + if parse_links + else [], ) @@ -343,7 +352,7 @@ def add_cre(self, dbcre: CRE): "description": dbcre.description, "links": [], # dbcre.links, "tags": [dbcre.tags] if isinstance(dbcre.tags, str) else dbcre.tags, - "external_id": dbcre.external_id + "external_id": dbcre.external_id, } ) @@ -503,7 +512,7 @@ def standards(self) -> List[str]: @staticmethod def parse_node(node: NeoDocument) -> cre_defs.Document: return node.to_cre_def(node) - + @staticmethod def parse_node_no_links(node: NeoDocument) -> cre_defs.Document: return node.to_cre_def(node, parse_links=False) @@ -1015,83 +1024,30 @@ def get_CREs( "You need to search by external_id, internal_id name or description" ) return [] - if external_id: - if not partial: - return [NEO_DB.parse_node(NeoCRE.nodes.get(external_id=external_id))] - # query = query.filter(CRE.external_id == external_id) - else: - query = query.filter(CRE.external_id.like(external_id)) - if name: - if not partial: - query = query.filter(func.lower(CRE.name) == name.lower()) - else: - query = query.filter(func.lower(CRE.name).like(name.lower())) - if description: - if not partial: - query = query.filter(func.lower(CRE.description) == description.lower()) - else: - query = query.filter( - func.lower(CRE.description).like(description.lower()) - ) - if internal_id: - query = CRE.query.filter(CRE.id == internal_id) - - dbcres = query.all() - if not dbcres: - logger.warning( - "CRE %s:%s:%s does not exist in the db" - % (external_id, name, description) + params = dict( + external_id=external_id, + name=name, + description=description, + id=internal_id, + ) + if partial: + params = dict( + external_id__icontains=external_id, + name__icontains=name, + description__icontains=description, + id__icontains=internal_id, ) - return [] - # todo figure a way to return both the Node - # and the link_type for that link - for dbcre in dbcres: - cre = CREfromDB(dbcre) - linked_nodes = self.session.query(Links).filter(Links.cre == dbcre.id).all() - for ls in linked_nodes: - nd = self.session.query(Node).filter(Node.id == ls.node).first() - if not include_only or (include_only and nd.name in include_only): - cre.add_link( - cre_defs.Link( - document=nodeFromDB(nd), - ltype=cre_defs.LinkTypes.from_str(ls.type), - ) - ) - # todo figure the query to merge the following two - internal_links = ( - self.session.query(InternalLinks) - .filter( - sqla.or_( - InternalLinks.cre == dbcre.id, InternalLinks.group == dbcre.id - ) - ) - .all() - ) - for il in internal_links: - q = self.session.query(CRE) - - res: CRE - ltype = cre_defs.LinkTypes.from_str(il.type) - - if il.cre == dbcre.id: # if we are a CRE in this relationship - res = q.filter( - CRE.id == il.group - ).first() # get the group in order to add the link - # if this CRE is the lower level cre the relationship will be tagged "Contains" - # in that case the implicit relationship is "Is Part Of" - # otherwise the relationship will be "Related" and we don't need to do anything - if ltype == cre_defs.LinkTypes.Contains: - # important, this is the only implicit link we have for now - ltype = cre_defs.LinkTypes.PartOf - elif ltype == cre_defs.LinkTypes.PartOf: - ltype = cre_defs.LinkTypes.Contains - elif il.group == dbcre.id: - res = q.filter(CRE.id == il.cre).first() - ltype = cre_defs.LinkTypes.from_str(il.type) - cre.add_link(cre_defs.Link(document=CREfromDB(res), ltype=ltype)) - cres.append(cre) - return cres + params_filtered = {k: v for k, v in params.items() if v is not None} + parsed_response = [ + NEO_DB.parse_node(x) for x in NeoCRE.nodes.filter(**params_filtered) + ] + if include_only: + for node in parsed_response: + node.links = [ + link for link in node.links if link.document.name in include_only + ] + return parsed_response def export(self, dir: str = None, dry_run: bool = False) -> List[cre_defs.Document]: """Exports the database to a CRE file collection on disk""" diff --git a/application/tests/db_test.py b/application/tests/db_test.py index 6fafe162d..c35d44389 100644 --- a/application/tests/db_test.py +++ b/application/tests/db_test.py @@ -475,162 +475,206 @@ def test_find_cres_of_standard(self) -> None: cres = self.collection.find_cres_of_node(lone_standard) self.assertIsNone(cres) - def test_get_CREs(self) -> None: - """Given: a cre 'C1' that links to cres both as a group and a cre and other standards - return the CRE in Document format""" + @patch.object(db.NeoCRE, "nodes") + def test_get_CREs_no_match(self, nodes_mock) -> None: + nodes_mock.filter.return_value = [] + collection = db.Node_collection() - dbc1 = db.CRE(external_id="123", description="gcCD1", name="gcC1") - dbc2 = db.CRE(description="gcCD2", name="gcC2") - dbc3 = db.CRE(description="gcCD3", name="gcC3") - db_id_only = db.CRE(description="c_get_by_internal_id_only", name="cgbiio") - dbs1 = db.Node( - ntype=defs.Standard.__name__, - name="gcS2", - section="gc1", - subsection="gc2", - link="gc3", - version="gc1.1.1", + + self.assertEqual([], collection.get_CREs(external_id="123")) + + @patch.object(db.NeoCRE, "nodes") + def test_get_CREs_by_single_parameter(self, nodes_mock) -> None: + db_response = db.NeoCRE( + external_id="123", + description="gcCD1", + name="gcC1", + contained_in=[], + contains=[ + db.NeoCRE(external_id="123", description="gcCD2", name="gcC2"), + db.NeoCRE(external_id="123", description="gcCD3", name="gcC3"), + ], + linked=[ + db.NeoStandard( + hyperlink="gc3", + name="gcS2", + section="gc1", + subsection="gc2", + version="gc1.1.1", + ) + ], + same_as=[], + related=[], ) + nodes_mock.filter.return_value = [db_response] + expected = db.NEO_DB.parse_node(db_response) - dbs2 = db.Node( - ntype=defs.Standard.__name__, - name="gcS3", - section="gc1", - subsection="gc2", - link="gc3", - version="gc3.1.2", - ) - - collection.session.add(dbc1) - collection.session.add(dbc2) - collection.session.add(dbc3) - collection.session.add(dbs1) - collection.session.add(dbs2) - collection.session.add(db_id_only) - collection.session.commit() + collection = db.Node_collection() - collection.session.add( - db.InternalLinks(type="Contains", group=dbc1.id, cre=dbc2.id) - ) - collection.session.add( - db.InternalLinks(type="Contains", group=dbc1.id, cre=dbc3.id) + res = collection.get_CREs(external_id="123") + self.assertEqual(1, len(res)) + self.assertDictEqual(expected.todict(), res[0].todict()) + nodes_mock.filter.assert_called_with(external_id="123") + + res2 = collection.get_CREs(name="gcC1") + self.assertEqual(1, len(res2)) + self.assertDictEqual(expected.todict(), res2[0].todict()) + nodes_mock.filter.assert_called_with(name="gcC1") + + res3 = collection.get_CREs(description="gcCD1") + self.assertEqual(1, len(res3)) + self.assertDictEqual(expected.todict(), res3[0].todict()) + nodes_mock.filter.assert_called_with(description="gcCD1") + + res3 = collection.get_CREs(internal_id="abc") + self.assertEqual(1, len(res3)) + self.assertDictEqual(expected.todict(), res3[0].todict()) + nodes_mock.filter.assert_called_with(id="abc") + + @patch.object(db.NeoCRE, "nodes") + def test_get_CREs_by_single_parameter_partial(self, nodes_mock) -> None: + db_response = db.NeoCRE( + external_id="123", + description="gcCD1", + name="gcC1", + contained_in=[], + contains=[ + db.NeoCRE(external_id="123", description="gcCD2", name="gcC2"), + db.NeoCRE(external_id="123", description="gcCD3", name="gcC3"), + ], + linked=[ + db.NeoStandard( + hyperlink="gc3", + name="gcS2", + section="gc1", + subsection="gc2", + version="gc1.1.1", + ) + ], + same_as=[], + related=[], ) - collection.session.add(db.Links(type="Linked To", cre=dbc1.id, node=dbs1.id)) + nodes_mock.filter.return_value = [db_response] + expected = db.NEO_DB.parse_node(db_response) - collection.session.commit() + collection = db.Node_collection() + + res = collection.get_CREs(partial=True, external_id="123") + self.assertEqual(1, len(res)) + self.assertDictEqual(expected.todict(), res[0].todict()) + nodes_mock.filter.assert_called_with(external_id__icontains="123") + + res2 = collection.get_CREs(partial=True, name="gcC1") + self.assertEqual(1, len(res2)) + self.assertDictEqual(expected.todict(), res2[0].todict()) + nodes_mock.filter.assert_called_with(name__icontains="gcC1") + + res3 = collection.get_CREs(partial=True, description="gcCD1") + self.assertEqual(1, len(res3)) + self.assertDictEqual(expected.todict(), res3[0].todict()) + nodes_mock.filter.assert_called_with(description__icontains="gcCD1") + + res3 = collection.get_CREs(partial=True, internal_id="abc") + self.assertEqual(1, len(res3)) + self.assertDictEqual(expected.todict(), res3[0].todict()) + nodes_mock.filter.assert_called_with(id__icontains="abc") + + @patch.object(db.NeoCRE, "nodes") + def test_get_CREs_by_combination(self, nodes_mock) -> None: + db_response = db.NeoCRE( + external_id="123", + description="gcCD1", + name="gcC1", + contained_in=[], + contains=[ + db.NeoCRE(external_id="123", description="gcCD2", name="gcC2"), + db.NeoCRE(external_id="123", description="gcCD3", name="gcC3"), + ], + linked=[ + db.NeoStandard( + hyperlink="gc3", + name="gcS2", + section="gc1", + subsection="gc2", + version="gc1.1.1", + ) + ], + same_as=[], + related=[], + ) + nodes_mock.filter.return_value = [db_response] + expected = db.NEO_DB.parse_node(db_response) - cd1 = defs.CRE(id="123", description="gcCD1", name="gcC1", links=[]) - cd2 = defs.CRE(description="gcCD2", name="gcC2") - cd3 = defs.CRE(description="gcCD3", name="gcC3") - c_id_only = defs.CRE(description="c_get_by_internal_id_only", name="cgbiio") + collection = db.Node_collection() - expected = [ - copy(cd1) - .add_link( - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Standard( + res = collection.get_CREs(external_id="123", name="gcCD1") + self.assertEqual(1, len(res)) + self.assertDictEqual(expected.todict(), res[0].todict()) + nodes_mock.filter.assert_called_with(external_id="123", name="gcCD1") + + res = collection.get_CREs(external_id="123", name="gcCD1", description="gcCD1") + self.assertEqual(1, len(res)) + self.assertDictEqual(expected.todict(), res[0].todict()) + nodes_mock.filter.assert_called_with( + external_id="123", name="gcCD1", description="gcCD1" + ) + + res = collection.get_CREs( + external_id="123", name="gcCD1", description="gcCD1", internal_id="abc" + ) + self.assertEqual(1, len(res)) + self.assertDictEqual(expected.todict(), res[0].todict()) + nodes_mock.filter.assert_called_with( + external_id="123", name="gcCD1", description="gcCD1", id="abc" + ) + + @patch.object(db.NeoCRE, "nodes") + def test_get_CREs_by_include_only(self, nodes_mock) -> None: + nodes_mock.filter.return_value = [ + db.NeoCRE( + external_id="123", + description="gcCD1", + name="gcC1", + contained_in=[], + contains=[ + db.NeoCRE(external_id="123", description="gcCD2", name="gcC2"), + db.NeoCRE(external_id="123", description="gcCD3", name="gcC3"), + ], + linked=[ + db.NeoStandard( + hyperlink="gc3", name="gcS2", section="gc1", subsection="gc2", - hyperlink="gc3", version="gc1.1.1", - ), - ) - ) - .add_link( - defs.Link( - ltype=defs.LinkTypes.Contains, - document=copy(cd2), - ) + ) + ], + same_as=[], + related=[], ) - .add_link(defs.Link(ltype=defs.LinkTypes.Contains, document=copy(cd3))) ] - self.maxDiff = None - shallow_cd1 = copy(cd1) - shallow_cd1.links = [] - cd2.add_link(defs.Link(ltype=defs.LinkTypes.PartOf, document=shallow_cd1)) - cd3.add_link(defs.Link(ltype=defs.LinkTypes.PartOf, document=shallow_cd1)) - self.assertEqual([], collection.get_CREs()) - - res = collection.get_CREs(name="gcC1") - self.assertEqual(len(expected), len(res)) - self.assertDictEqual(expected[0].todict(), res[0].todict()) - res = collection.get_CREs(external_id="123") - self.assertEqual(len(expected), len(res)) - self.assertDictEqual(expected[0].todict(), res[0].todict()) - - res = collection.get_CREs(external_id="12%", partial=True) - self.assertEqual(len(expected), len(res)) - self.assertDictEqual(expected[0].todict(), res[0].todict()) - - res = collection.get_CREs(name="gcC%", partial=True) - - res = collection.get_CREs(external_id="1%", name="gcC%", partial=True) - self.assertEqual(len(expected), len(res)) - self.assertDictEqual(expected[0].todict(), res[0].todict()) - - res = collection.get_CREs(description="gcCD1") - self.assertEqual(len(expected), len(res)) - self.assertDictEqual(expected[0].todict(), res[0].todict()) - - res = collection.get_CREs(external_id="1%", description="gcC%", partial=True) - self.assertEqual(len(expected), len(res)) - self.assertDictEqual(expected[0].todict(), res[0].todict()) - - res = collection.get_CREs(description="gcC%", name="gcC%", partial=True) - want = [expected[0], cd2, cd3] - for el in res: - found = False - for wel in want: - if el.todict() == wel.todict(): - found = True - self.assertTrue(found) - - self.assertEqual([], collection.get_CREs(external_id="123", name="gcC5")) - self.assertEqual([], collection.get_CREs(external_id="1234")) - self.assertEqual([], collection.get_CREs(name="gcC5")) - - collection.session.add(db.Links(type="Linked To", cre=dbc1.id, node=dbs2.id)) - - only_gcS2 = deepcopy(expected) - expected[0].add_link( - defs.Link( - ltype=defs.LinkTypes.LinkedTo, - document=defs.Standard( - name="gcS3", - section="gc1", - subsection="gc2", - hyperlink="gc3", - version="gc3.1.2", - ), + expected = db.NEO_DB.parse_node( + db.NeoCRE( + external_id="123", + description="gcCD1", + name="gcC1", + contained_in=[], + contains=[ + db.NeoCRE(external_id="123", description="gcCD2", name="gcC2") + ], + linked=[], + same_as=[], + related=[], ) ) - res = collection.get_CREs(name="gcC1") - self.assertCountEqual(expected[0].todict(), res[0].todict()) - - res = collection.get_CREs(name="gcC1", include_only=["gcS2"]) - self.assertDictEqual(only_gcS2[0].todict(), res[0].todict()) - - ccd2 = copy(cd2) - ccd2.links = [] - ccd3 = copy(cd3) - ccd3.links = [] - no_standards = [ - copy(cd1) - .add_link( - defs.Link( - ltype=defs.LinkTypes.Contains, - document=ccd2, - ) - ) - .add_link(defs.Link(ltype=defs.LinkTypes.Contains, document=ccd3)) - ] - res = collection.get_CREs(name="gcC1", include_only=["gcS0"]) - self.assertEqual(no_standards, res) - self.assertEqual([c_id_only], collection.get_CREs(internal_id=db_id_only.id)) + collection = db.Node_collection() + + res = collection.get_CREs(external_id="123", include_only=["gcC2"]) + self.assertEqual(1, len(res)) + self.assertDictEqual(expected.todict(), res[0].todict()) + nodes_mock.filter.assert_called_with(external_id="123") def test_get_standards(self) -> None: """Given: a Standard 'S1' that links to cres @@ -1046,99 +1090,50 @@ def test_object_select(self) -> None: self.assertEqual(collection.object_select(None), []) - def test_get_root_cres(self): - """Given: - 6 CRES: - * C0 <-- Root - * C1 <-- Root - * C2 Part Of C0 - * C3 Part Of C1 - * C4 Part Of C2 - * C5 Related to C0 - * C6 Part Of C1 but registered as C6 being the "group" - * C7 Contains C6 but registered as C6 being the "group" <-- Root - 3 Nodes: - * N0 Unlinked - * N1 Linked To C1 - * N2 Linked to C2 - * N3 Linked to C3 - * N4 Linked to C4 - Get_root_cres should return C0, C1 - """ - cres = [] - nodes = [] - dbcres = [] - dbnodes = [] - sqla.session.remove() - sqla.drop_all() - sqla.create_all() + @patch.object(db.NeoCRE, "nodes") + def test_get_root_cres(self, nodes_mock): collection = db.Node_collection() - collection.graph.graph = db.CRE_Graph.load_cre_graph(sqla.session) - - for i in range(0, 8): - if i == 0 or i == 1: - cres.append(defs.CRE(name=f">> C{i}", id=f"{i}")) - else: - cres.append(defs.CRE(name=f"C{i}", id=f"{i}")) - - dbcres.append(collection.add_cre(cres[i])) - nodes.append(defs.Standard(section=f"S{i}", name=f"N{i}")) - dbnodes.append(collection.add_node(nodes[i])) - cres[i].add_link( - defs.Link(document=copy(nodes[i]), ltype=defs.LinkTypes.LinkedTo) - ) - collection.add_link( - cre=dbcres[i], node=dbnodes[i], type=defs.LinkTypes.LinkedTo + db_response = [ + db.NeoCRE( + external_id="123", + description="gcCD1", + name="gcC1", + contained_in=[], + contains=[ + db.NeoCRE(external_id="123", description="gcCD2", name="gcC2"), + db.NeoCRE(external_id="123", description="gcCD3", name="gcC3"), + ], + linked=[ + db.NeoStandard( + hyperlink="gc3", + name="gcS2", + section="gc1", + subsection="gc2", + version="gc1.1.1", + ) + ], + same_as=[], + related=[], + ), + db.NeoCRE( + external_id="345", + description="gcCD2", + name="gcC2", + contained_in=[], + contains=[], + linked=[], + same_as=[], + related=[], ) - - cres[0].add_link( - defs.Link(document=cres[2].shallow_copy(), ltype=defs.LinkTypes.Contains) - ) - cres[0].add_link( - defs.Link(document=cres[5].shallow_copy(), ltype=defs.LinkTypes.Related) - ) - cres[1].add_link( - defs.Link(document=cres[3].shallow_copy(), ltype=defs.LinkTypes.Contains) - ) - cres[2].add_link( - defs.Link(document=cres[4].shallow_copy(), ltype=defs.LinkTypes.Contains) - ) - - cres[3].add_link( - defs.Link(document=cres[5].shallow_copy(), ltype=defs.LinkTypes.Contains) - ) - - cres[6].add_link( - defs.Link(document=cres[7].shallow_copy(), ltype=defs.LinkTypes.PartOf) - ) - collection.add_internal_link( - group=dbcres[0], cre=dbcres[2], type=defs.LinkTypes.Contains - ) - collection.add_internal_link( - group=dbcres[1], cre=dbcres[3], type=defs.LinkTypes.Contains - ) - collection.add_internal_link( - group=dbcres[2], cre=dbcres[4], type=defs.LinkTypes.Contains - ) - collection.add_internal_link( - group=dbcres[5], cre=dbcres[0], type=defs.LinkTypes.Related - ) - collection.add_internal_link( - group=dbcres[3], cre=dbcres[5], type=defs.LinkTypes.Contains - ) - collection.add_internal_link( - group=dbcres[6], cre=dbcres[7], type=defs.LinkTypes.PartOf - ) - - collection.session.commit() - - cres[7].add_link( - defs.Link(document=cres[6].shallow_copy(), ltype=defs.LinkTypes.Contains) - ) + ] + collection = db.Node_collection() + nodes_mock.has.return_value = db_response + expected = [db.NEO_DB.parse_node(x).todict() for x in db_response] root_cres = collection.get_root_cres() self.maxDiff = None - self.assertEqual(root_cres, [cres[0], cres[1], cres[7]]) + self.assertEqual([x.todict() for x in root_cres], expected) + nodes_mock.has.assert_called_with(contained_in=False) @patch.object(db.NEO_DB, "gap_analysis") def test_gap_analysis_disconnected(self, gap_mock):