diff --git a/pyproject.toml b/pyproject.toml index c59c8cd83..e3663591c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ kgcl-schema = "^0.6.8" funowl = ">=0.2.0" gilda = {version = ">=1.0.0", optional = true} semsimian = {version = ">=0.2.16", optional = true} +py-horned-owl = {version = ">=0.3.2", optional = true} kgcl-rdflib = "0.5.0" llm = {version = "*", optional = true} html2text = {version = "*", optional = true} @@ -79,6 +80,7 @@ gilda = ["scipy", "gilda", "urllib3"] llm = ["llm", "aiohttp", "html2text"] seaborn = ["seaborn"] semsimian = ["semsimian"] +owl = ["py-horned-owl"] [tool.black] line-length = 100 diff --git a/src/oaklib/implementations/owl/__init__.py b/src/oaklib/implementations/owl/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/oaklib/implementations/owl/owl_implementation.py b/src/oaklib/implementations/owl/owl_implementation.py new file mode 100644 index 000000000..36f60891a --- /dev/null +++ b/src/oaklib/implementations/owl/owl_implementation.py @@ -0,0 +1,205 @@ +import logging +from dataclasses import dataclass +from typing import Any, Iterable, List, Mapping, Optional + +import rdflib +from pyhornedowl import PyIndexedOntology +from pyhornedowl.model import ( + IRI, + AnnotationAssertion, + Axiom, + ObjectSomeValuesFrom, + SubClassOf, +) +from funowl.converters.functional_converter import to_python +from funowl.writers.FunctionalWriter import FunctionalWriter +from kgcl_schema.datamodel import kgcl + +from oaklib.datamodels.vocabulary import ( + DEPRECATED_PREDICATE, + HAS_DEFINITION_CURIE, + HAS_EXACT_SYNONYM, + IS_A, + LABEL_PREDICATE, +) +from oaklib.interfaces import SearchInterface +from oaklib.interfaces.basic_ontology_interface import LANGUAGE_TAG +from oaklib.interfaces.owl_interface import OwlInterface, ReasonerConfiguration +from oaklib.interfaces.patcher_interface import PatcherInterface +from oaklib.types import CURIE, PRED_CURIE + + +@dataclass +class OwlImplementation(OwlInterface, PatcherInterface, SearchInterface): + """ + An experimental partial implementation of :ref:`OwlInterface` + + Wraps FunOWL + + ``_ + + """ + + ontology: PyIndexedOntology = None + + def __post_init__(self): + if self.ontology is None: + resource = self.resource + if resource is None or resource.local_path is None: + ontology = PyIndexedOntology() + else: + logging.info(f"Loading {resource.local_path} into FunOwl") + import pyhornedowl + ontology = pyhornedowl.open_ontology(str(resource.local_path)) + self.ontology = ontology + + @property + def _ontology(self): + return self.ontology + + def entity_iri_to_curie(self, entity: IRI) -> CURIE: + uri = entity.to_rdf(self.functional_writer.g) + return self.uri_to_curie(str(uri), use_uri_fallback=True) + + def curie_to_entity_iri(self, curie: CURIE) -> IRI: + return IRI(self.curie_to_uri(curie)) + + def _single_valued_assignment(self, curie: CURIE, property: CURIE) -> Optional[str]: + labels = [a.value for a in self.annotation_assertion_axioms(curie, property=property)] + if labels: + if len(labels) > 1: + logging.warning(f"Multiple labels for {curie} = {labels}") + val = labels[0] + rdf_v = val.to_rdf(self.functional_writer.g) + if isinstance(rdf_v, rdflib.Literal): + return rdf_v.value + else: + raise ValueError(f"Label must be literal, not {val}") + + def definition(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> Optional[str]: + return self._single_valued_assignment(curie, HAS_DEFINITION_CURIE) + + def label(self, curie: CURIE, lang: Optional[LANGUAGE_TAG] = None) -> str: + labels = [ + a.value for a in self.annotation_assertion_axioms(curie, property=LABEL_PREDICATE) + ] + if labels: + if len(labels) > 1: + logging.warning(f"Multiple labels for {curie} = {labels}") + label = labels[0] + rdf_v = label.to_rdf(self.functional_writer.g) + if isinstance(rdf_v, rdflib.Literal): + return rdf_v.value + else: + raise ValueError(f"Label must be literal, not {label}") + + def entities(self, filter_obsoletes=True, owl_type=None) -> Iterable[CURIE]: + for ax in self._ontology.axioms: + if isinstance(ax, Declaration): + uri = ax.v.full_uri(self.functional_writer.g) + try: + yv = self.uri_to_curie(str(uri)) + except ValueError: + logging.warning( + "could not compress URI %s with functional writer context %s", + uri, + list(self.functional_writer.g.namespaces()), + ) + continue + else: + yield yv + + def axioms(self, reasoner: Optional[ReasonerConfiguration] = None) -> Iterable[Axiom]: + ont = self._ontology + for axiom in ont.get_axioms(): + yield axiom + + def set_axioms(self, axioms: List[Axiom]) -> None: + self._ontology.axioms = axioms + + def dump(self, path: str = None, syntax: str = None, **kwargs): + if syntax is None or syntax == "ofn": + out = self.ontology_document.to_functional(self.functional_writer) + elif syntax == "ttl" or syntax == "turtle": + g = rdflib.Graph() + self.ontology_document.to_rdf(g) + out = g.serialize(format="ttl") + else: + out = str(self.ontology_document) + if path is None: + print(out) + elif isinstance(path, str): + with open(path, "w", encoding="UTF-8") as file: + file.write(str(out)) + else: + path.write(str(out)) + + # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + # Implements: PatcherInterface + # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + def _set_annotation_predicate_value(self, subject: CURIE, property: CURIE, value: Any): + for axiom in self.annotation_assertion_axioms(subject, property): + self._ontology.axioms.remove(axiom) + self._ontology.axioms.append( + AnnotationAssertion( + subject=self.curie_to_entity_iri(subject), + property=self.curie_to_entity_iri(property), + value=value, + ) + ) + + def apply_patch( + self, + patch: kgcl.Change, + activity: kgcl.Activity = None, + metadata: Mapping[PRED_CURIE, Any] = None, + configuration: kgcl.Configuration = None, + ) -> Optional[kgcl.Change]: + if isinstance(patch, kgcl.NodeChange): + about = patch.about_node + if isinstance(patch, kgcl.NodeRename): + self._set_annotation_predicate_value(about, LABEL_PREDICATE, patch.new_value) + elif isinstance(patch, kgcl.NodeTextDefinitionChange): + self._set_annotation_predicate_value(about, HAS_DEFINITION_CURIE, patch.new_value) + elif isinstance(patch, kgcl.NewSynonym): + self._ontology.axioms.append( + AnnotationAssertion( + subject=about, + property=self.curie_to_entity_iri(HAS_EXACT_SYNONYM), + value=patch.new_value, + ) + ) + elif isinstance(patch, kgcl.NodeObsoletion): + self._set_annotation_predicate_value(about, DEPRECATED_PREDICATE, value=True) + elif isinstance(patch, kgcl.NodeDeletion): + raise NotImplementedError("Deletions not supported yet") + elif isinstance(patch, kgcl.NodeCreation): + self._set_annotation_predicate_value(about, LABEL_PREDICATE, patch.name) + elif isinstance(patch, kgcl.NameBecomesSynonym): + label = self.label(about) + self.apply_patch( + kgcl.NodeRename(id=f"{patch.id}-1", about_node=about, new_value=patch.new_value) + ) + self.apply_patch( + kgcl.NewSynonym(id=f"{patch.id}-2", about_node=about, new_value=label) + ) + else: + raise NotImplementedError(f"Cannot handle patches of type {type(patch)}") + elif isinstance(patch, kgcl.EdgeChange): + about = patch.about_edge + subject = self.curie_to_uri(patch.subject) + object = self.curie_to_uri(patch.object) + if isinstance(patch, kgcl.EdgeCreation): + if patch.predicate == IS_A or patch.predicate == "is_a": + self._ontology.axioms.append(SubClassOf(subject, object)) + else: + predicate = self.curie_to_entity_iri(patch.predicate) + self._ontology.axioms.append( + SubClassOf(subject, ObjectSomeValuesFrom(predicate, object)) + ) + else: + raise NotImplementedError(f"Cannot handle patches of type {type(patch)}") + else: + raise NotImplementedError(f"Cannot handle patches of type {type(patch)}") + return patch diff --git a/tests/test_implementations/test_owl.py b/tests/test_implementations/test_owl.py new file mode 100644 index 000000000..04f2ec437 --- /dev/null +++ b/tests/test_implementations/test_owl.py @@ -0,0 +1,90 @@ +import logging +import unittest + +from pyhornedowl.model import EquivalentClasses, SubClassOf +from kgcl_schema.datamodel import kgcl +from oaklib.implementations.owl.owl_implementation import OwlImplementation +from oaklib.interfaces.obograph_interface import OboGraphInterface +from oaklib.interfaces.owl_interface import AxiomFilter +from oaklib.resource import OntologyResource +from oaklib.utilities.kgcl_utilities import generate_change_id + +from tests import CHEBI_NUCLEUS, HUMAN, INPUT_DIR, NUCLEUS, VACUOLE + +TEST_ONT = INPUT_DIR / "go-nucleus.owl" +TEST_INST_ONT = INPUT_DIR / "inst.ofn" +NEW_NAME = "new name" + + +class TestWwlImplementation(unittest.TestCase): + def setUp(self) -> None: + resource = OntologyResource(TEST_ONT) + self.oi = OwlImplementation(resource) + + def test_axioms(self): + for ann_axiom in self.oi.axioms(): + axiom = ann_axiom.axiom + print(axiom, type(axiom)) + + def test_entities(self): + curies = list(self.oi.entities()) + self.assertIn(NUCLEUS, curies) + self.assertIn(CHEBI_NUCLEUS, curies) + self.assertIn(HUMAN, curies) + + @unittest.skip("OboGraph not yet implemented") + def test_edges(self): + oi = self.oi + curies = list(oi.entities()) + if isinstance(oi, OboGraphInterface): + for curie in curies: + for rel in oi.outgoing_relationships(curie): + logging.info(rel) + else: + raise NotImplementedError + + def test_filter_axioms(self): + FunctionalWriter() + oi = self.oi + self.assertCountEqual( + list(oi.axioms()), + list(oi.filter_axioms(AxiomFilter())), + "empty axiom filter should return all axioms", + ) + subclass_axioms = list(oi.filter_axioms(AxiomFilter(type=SubClassOf))) + for ax in subclass_axioms: + self.assertEqual(type(ax), SubClassOf) + self.assertGreater(len(subclass_axioms), 10) + ec_axioms = list(oi.equivalence_axioms()) + for ax in ec_axioms: + self.assertEqual(type(ax), EquivalentClasses) + self.assertGreater(len(ec_axioms), 10) + nucleus_axioms = list(oi.filter_axioms(AxiomFilter(about=NUCLEUS))) + n_subclass = 0 + for ax in nucleus_axioms: + if isinstance(ax, SubClassOf): + n_subclass += 1 + self.assertEqual(NUCLEUS, oi.entity_iri_to_curie(ax.subClassExpression)) + self.assertEqual(n_subclass, 3) + self.assertGreater(len(nucleus_axioms), 2) + nucleus_ref_axioms = list(oi.filter_axioms(AxiomFilter(references=NUCLEUS))) + n_ref_subclass = 0 + for ax in nucleus_ref_axioms: + if isinstance(ax, SubClassOf): + n_ref_subclass += 1 + self.assertGreater(n_ref_subclass, 3) + self.assertGreater(len(nucleus_ref_axioms), 3) + for ax in nucleus_axioms: + self.assertIn(ax, nucleus_ref_axioms) + + def test_patcher(self): + oi = self.oi + anns = list(oi.annotation_assertion_axioms(NUCLEUS)) + self.assertGreater(len(anns), 5) + label = oi.label(NUCLEUS) + self.assertEqual("nucleus", label) + oi.apply_patch( + kgcl.NodeRename(id=generate_change_id(), about_node=VACUOLE, new_value=NEW_NAME) + ) + label = oi.label(VACUOLE) + self.assertEqual(NEW_NAME, label)