Skip to content

Commit

Permalink
shacl utilities: Add new SHACL path building utility with correspondi…
Browse files Browse the repository at this point in the history
…ng tests
  • Loading branch information
mgberg committed Nov 21, 2024
1 parent 9106eee commit c0f768f
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 3 deletions.
119 changes: 118 additions & 1 deletion rdflib/extras/shacl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,28 @@

from typing import TYPE_CHECKING

from rdflib import Graph, Literal, URIRef, paths
from rdflib import BNode, Graph, Literal, URIRef, paths
from rdflib.collection import Collection
from rdflib.namespace import RDF, SH
from rdflib.paths import Path

if TYPE_CHECKING:
from rdflib.graph import _ObjectType
from rdflib.term import IdentifiedNode


class SHACLPathError(Exception):
pass


# Map the variable length path operators to the corresponding SHACL path predicates
_PATH_MOD_TO_PRED = {
paths.ZeroOrMore: SH.zeroOrMorePath,
paths.OneOrMore: SH.oneOrMorePath,
paths.ZeroOrOne: SH.zeroOrOnePath,
}


# This implementation is roughly based on
# pyshacl.helper.sparql_query_helper::SPARQLQueryHelper._shacl_path_to_sparql_path
def parse_shacl_path(
Expand Down Expand Up @@ -93,3 +103,110 @@ def parse_shacl_path(
raise SHACLPathError(f"Cannot parse {repr(path_identifier)} as a SHACL Path.")

return path


def _build_path_component(
graph: Graph, path_component: URIRef | Path
) -> IdentifiedNode:
"""
Helper method that implements the recursive component of SHACL path
triple construction.
:param graph: A :class:`~rdflib.graph.Graph` into which to insert triples
:param graph_component: A :class:`~rdflib.term.URIRef` or
:class:`~rdflib.paths.Path` that is part of a path expression
:return: The :class:`~rdflib.term.IdentifiedNode of the resource in the
graph that corresponds to the provided `path_component
"""
# Literals or other types are not allowed
if not isinstance(path_component, (URIRef, Path)):
raise TypeError(
f"Objects of type {type(path_component)} are not valid "
+ "components of a SHACL path."
)

# If the path component is a URI, return it
elif isinstance(path_component, URIRef):
return path_component
# Otherwise, the path component is represented as a blank node
bnode = BNode()

# Handle Sequence Paths
if isinstance(path_component, paths.SequencePath):
# Sequence paths are a Collection directly with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"A list of SHACL Sequence Paths must contain at least two path items."
)
Collection(
graph,
bnode,
[_build_path_component(graph, arg) for arg in path_component.args],
)

# Handle Inverse Paths
elif isinstance(path_component, paths.InvPath):
graph.add(
(bnode, SH.inversePath, _build_path_component(graph, path_component.arg))
)

# Handle Alternative Paths
elif isinstance(path_component, paths.AlternativePath):
# Alternative paths are a Collection but referenced by sh:alternativePath
# with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"List of SHACL alternate paths must have at least two path items."
)
coll = Collection(
graph,
BNode(),
[_build_path_component(graph, arg) for arg in path_component.args],
)
graph.add((bnode, SH.alternativePath, coll.uri))

# Handle Variable Length Paths
elif isinstance(path_component, paths.MulPath):
# Get the predicate corresponding to the path modifiier
pred = _PATH_MOD_TO_PRED.get(path_component.mod)
if pred is None:
raise SHACLPathError(f"Unknown path modifier {path_component.mod}")
graph.add((bnode, pred, _build_path_component(graph, path_component.path)))

# Return the blank node created for the provided path_component
return bnode


def build_shacl_path(
path: URIRef | Path, target_graph: Graph | None = None
) -> tuple[IdentifiedNode, Graph | None]:
"""
Build the SHACL Path triples for a path given by a :class:`~rdflib.term.URIRef` for
simple paths or a :class:`~rdflib.paths.Path` for complex paths.
Returns an :class:`~rdflib.term.IdentifiedNode` for the path (which should be
the object of a triple with predicate `sh:path`) and the graph into which any
new triples were added.
:param path: A :class:`~rdflib.term.URIRef` or a :class:`~rdflib.paths.Path`
:param target_graph: Optionally, a :class:`~rdflib.graph.Graph` into which to put
constructed triples. If not provided, a new graph will be created
:return: A (`path_identifier`, `graph`) tuple where:
- `path_identifier`: If `path` is a :class:`~rdflib.term.URIRef`, this is simply
the provided `path`. If `path` is a :class:`~rdflib.paths.Path`, this is
the :class:`~rdflib.term.BNode` corresponding to the root of the SHACL
path expression added to the graph.
- `graph`: `None` if `path` is a :class:`~rdflib.term.URIRef` (as no new triples
are constructed). If `path` is a :class:`~rdflib.paths.Path`, this is either the
`target_graph` provided or a new graph into which the path triples were added.
"""
# If a path is a URI, that's the whole path. No graph needs to be constructed.
if isinstance(path, URIRef):
return path, None

# Create a graph if one was not provided
if target_graph is None:
target_graph = Graph()

# Recurse through the path to build the graph representation
return _build_path_component(target_graph, path), target_graph
51 changes: 49 additions & 2 deletions test/test_extras/test_shacl_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import pytest

from rdflib import Graph, URIRef
from rdflib.extras.shacl import SHACLPathError, parse_shacl_path
from rdflib import Graph, Literal, URIRef, paths
from rdflib.compare import graph_diff
from rdflib.extras.shacl import SHACLPathError, build_shacl_path, parse_shacl_path
from rdflib.namespace import SH, Namespace
from rdflib.paths import Path

Expand Down Expand Up @@ -248,3 +249,49 @@ def test_parse_shacl_path(
parse_shacl_path(path_source_data, path_root) # type: ignore[arg-type]
else:
assert parse_shacl_path(path_source_data, path_root) == expected # type: ignore[arg-type]


@pytest.mark.parametrize(
("resource", "path"),
(
# Single SHACL Path
(EX.TestPropShape1, EX.pred1),
(EX.TestPropShape2a, EX.pred1 / EX.pred2 / EX.pred3),
(EX.TestPropShape3, ~EX.pred1),
(EX.TestPropShape4a, EX.pred1 | EX.pred2 | EX.pred3),
(EX.TestPropShape5, EX.pred1 * "*"), # type: ignore[operator]
(EX.TestPropShape6, EX.pred1 * "+"), # type: ignore[operator]
(EX.TestPropShape7, EX.pred1 * "?"), # type: ignore[operator]
# SHACL Path Combinations
(EX.TestPropShape8, ~EX.pred1 * "*"),
(
EX.TestPropShape10a,
~EX.pred1
* "*"
/ (~EX.pred1 * "*" | EX.pred1 | EX.pred2 * "+" | EX.pred3 * "*"), # type: ignore[operator]
),
(TypeError, Literal("Not a valid path")),
(SHACLPathError, paths.SequencePath(SH.targetClass)),
(SHACLPathError, paths.AlternativePath(SH.targetClass)),
),
)
def test_build_shacl_path(
path_source_data: Graph, resource: URIRef | type, path: Union[URIRef, Path]
):
if isinstance(resource, type):
with pytest.raises(resource): # type: ignore[arg-type]
build_shacl_path(path) # type: ignore[arg-type]
else:
expected_path_root = path_source_data.value(resource, SH.path)
actual_path_root, actual_path_graph = build_shacl_path(path)
if isinstance(expected_path_root, URIRef):
assert actual_path_root == expected_path_root
assert actual_path_graph is None
else:
assert isinstance(actual_path_graph, Graph)
expected_path_graph = path_source_data.cbd(expected_path_root)
in_both, in_first, in_second = graph_diff(
expected_path_graph, actual_path_graph
)
assert len(in_first) == 0
assert len(in_second) == 0

0 comments on commit c0f768f

Please sign in to comment.