2025-12-01

This commit is contained in:
2026-03-17 14:58:51 -06:00
parent 183e865f8b
commit 4b82b57113
6846 changed files with 954887 additions and 162606 deletions
@@ -0,0 +1,77 @@
from __future__ import annotations
import codecs
import getopt
import sys
import time
from typing import TextIO, Union
import rdflib
from rdflib.util import guess_format
def _help():
sys.stderr.write(
"""
program.py [-f <format>] [-o <output>] [files...]
Read RDF files given on STDOUT - does something to the resulting graph
If no files are given, read from stdin
-o specifies file for output, if not given stdout is used
-f specifies parser to use, if not given it is guessed from extension
"""
)
def main(target, _help=_help, options="", stdin=True):
"""
A main function for tools that read RDF from files given on commandline
or from STDIN (if stdin parameter is true)
"""
args, files = getopt.getopt(sys.argv[1:], "hf:o:" + options)
dargs = dict(args)
if "-h" in dargs:
_help()
sys.exit(-1)
g = rdflib.Graph()
if "-f" in dargs:
f = dargs["-f"]
else:
f = None
out: Union[TextIO, codecs.StreamReaderWriter]
if "-o" in dargs:
sys.stderr.write("Output to %s\n" % dargs["-o"])
out = codecs.open(dargs["-o"], "w", "utf-8")
else:
out = sys.stdout
start = time.time()
if len(files) == 0 and stdin:
sys.stderr.write("Reading from stdin as %s..." % f)
g.parse(sys.stdin, format=f)
sys.stderr.write("[done]\n")
else:
size = 0
for x in files:
if f is None:
f = guess_format(x)
start1 = time.time()
sys.stderr.write("Loading %s as %s... " % (x, f))
g.parse(x, format=f)
sys.stderr.write(
"done.\t(%d triples\t%.2f seconds)\n"
% (len(g) - size, time.time() - start1)
)
size = len(g)
sys.stderr.write(
"Loaded a total of %d triples in %.2f seconds.\n"
% (len(g), time.time() - start)
)
target(g, out, args)
@@ -0,0 +1,257 @@
"""
A Describer is a stateful utility for creating RDF statements in a
semi-declarative manner. It has methods for creating literal values, rel and
rev resource relations (somewhat resembling RDFa).
The `Describer.rel` and `Describer.rev` methods return a context manager which sets the current
about to the referenced resource for the context scope (for use with the
``with`` statement).
Full example in the ``to_rdf`` method below::
>>> import datetime
>>> from rdflib.graph import Graph
>>> from rdflib.namespace import Namespace, RDFS, FOAF
>>>
>>> ORG_URI = "http://example.org/"
>>>
>>> CV = Namespace("http://purl.org/captsolo/resume-rdf/0.2/cv#")
>>>
>>> class Person:
... def __init__(self):
... self.first_name = "Some"
... self.last_name = "Body"
... self.username = "some1"
... self.presentation = "Just a Python & RDF hacker."
... self.image = "/images/persons/" + self.username + ".jpg"
... self.site = "http://example.net/"
... self.start_date = datetime.date(2009, 9, 4)
... def get_full_name(self):
... return " ".join([self.first_name, self.last_name])
... def get_absolute_url(self):
... return "/persons/" + self.username
... def get_thumbnail_url(self):
... return self.image.replace('.jpg', '-thumb.jpg')
...
... def to_rdf(self):
... graph = Graph()
... graph.bind('foaf', FOAF)
... graph.bind('cv', CV)
... lang = 'en'
... d = Describer(graph, base=ORG_URI)
... d.about(self.get_absolute_url()+'#person')
... d.rdftype(FOAF.Person)
... d.value(FOAF.name, self.get_full_name())
... d.value(FOAF.givenName, self.first_name)
... d.value(FOAF.familyName, self.last_name)
... d.rel(FOAF.homepage, self.site)
... d.value(RDFS.comment, self.presentation, lang=lang)
... with d.rel(FOAF.depiction, self.image):
... d.rdftype(FOAF.Image)
... d.rel(FOAF.thumbnail, self.get_thumbnail_url())
... with d.rev(CV.aboutPerson):
... d.rdftype(CV.CV)
... with d.rel(CV.hasWorkHistory):
... d.value(CV.startDate, self.start_date)
... d.rel(CV.employedIn, ORG_URI+"#company")
... return graph
...
>>> person_graph = Person().to_rdf()
>>> expected = Graph().parse(data='''<?xml version="1.0" encoding="utf-8"?>
... <rdf:RDF
... xmlns:foaf="http://xmlns.com/foaf/0.1/"
... xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
... xmlns:cv="http://purl.org/captsolo/resume-rdf/0.2/cv#"
... xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#">
... <foaf:Person rdf:about="http://example.org/persons/some1#person">
... <foaf:name>Some Body</foaf:name>
... <foaf:givenName>Some</foaf:givenName>
... <foaf:familyName>Body</foaf:familyName>
... <foaf:depiction>
... <foaf:Image
... rdf:about=
... "http://example.org/images/persons/some1.jpg">
... <foaf:thumbnail
... rdf:resource=
... "http://example.org/images/persons/some1-thumb.jpg"/>
... </foaf:Image>
... </foaf:depiction>
... <rdfs:comment xml:lang="en">
... Just a Python &amp; RDF hacker.
... </rdfs:comment>
... <foaf:homepage rdf:resource="http://example.net/"/>
... </foaf:Person>
... <cv:CV>
... <cv:aboutPerson
... rdf:resource="http://example.org/persons/some1#person">
... </cv:aboutPerson>
... <cv:hasWorkHistory>
... <rdf:Description>
... <cv:startDate
... rdf:datatype="http://www.w3.org/2001/XMLSchema#date"
... >2009-09-04</cv:startDate>
... <cv:employedIn rdf:resource="http://example.org/#company"/>
... </rdf:Description>
... </cv:hasWorkHistory>
... </cv:CV>
... </rdf:RDF>
... ''', format="xml")
>>>
>>> from rdflib.compare import isomorphic
>>> isomorphic(person_graph, expected) #doctest: +SKIP
True
"""
from contextlib import contextmanager
from rdflib.graph import Graph
from rdflib.namespace import RDF
from rdflib.term import BNode, Identifier, Literal, URIRef
class Describer:
def __init__(self, graph=None, about=None, base=None):
if graph is None:
graph = Graph()
self.graph = graph
self.base = base
self._subjects = []
self.about(about or None)
def about(self, subject, **kws):
"""
Sets the current subject. Will convert the given object into an
``URIRef`` if it's not an ``Identifier``.
Usage::
>>> d = Describer()
>>> d._current() #doctest: +ELLIPSIS
rdflib.term.BNode(...)
>>> d.about("http://example.org/")
>>> d._current()
rdflib.term.URIRef('http://example.org/')
"""
kws.setdefault("base", self.base)
subject = cast_identifier(subject, **kws)
if self._subjects:
self._subjects[-1] = subject
else:
self._subjects.append(subject)
def value(self, p, v, **kws):
"""
Set a literal value for the given property. Will cast the value to an
``Literal`` if a plain literal is given.
Usage::
>>> from rdflib import URIRef
>>> from rdflib.namespace import RDF, RDFS
>>> d = Describer(about="http://example.org/")
>>> d.value(RDFS.label, "Example")
>>> d.graph.value(URIRef('http://example.org/'), RDFS.label)
rdflib.term.Literal('Example')
"""
v = cast_value(v, **kws)
self.graph.add((self._current(), p, v))
def rel(self, p, o=None, **kws):
"""Set an object for the given property. Will convert the given object
into an ``URIRef`` if it's not an ``Identifier``. If none is given, a
new ``BNode`` is used.
Returns a context manager for use in a ``with`` block, within which the
given object is used as current subject.
Usage::
>>> from rdflib import URIRef
>>> from rdflib.namespace import RDF, RDFS
>>> d = Describer(about="/", base="http://example.org/")
>>> _ctxt = d.rel(RDFS.seeAlso, "/about")
>>> d.graph.value(URIRef('http://example.org/'), RDFS.seeAlso)
rdflib.term.URIRef('http://example.org/about')
>>> with d.rel(RDFS.seeAlso, "/more"):
... d.value(RDFS.label, "More")
>>> (URIRef('http://example.org/'), RDFS.seeAlso,
... URIRef('http://example.org/more')) in d.graph
True
>>> d.graph.value(URIRef('http://example.org/more'), RDFS.label)
rdflib.term.Literal('More')
"""
kws.setdefault("base", self.base)
p = cast_identifier(p)
o = cast_identifier(o, **kws)
self.graph.add((self._current(), p, o))
return self._subject_stack(o)
def rev(self, p, s=None, **kws):
"""
Same as ``rel``, but uses current subject as *object* of the relation.
The given resource is still used as subject in the returned context
manager.
Usage::
>>> from rdflib import URIRef
>>> from rdflib.namespace import RDF, RDFS
>>> d = Describer(about="http://example.org/")
>>> with d.rev(RDFS.seeAlso, "http://example.net/"):
... d.value(RDFS.label, "Net")
>>> (URIRef('http://example.net/'), RDFS.seeAlso,
... URIRef('http://example.org/')) in d.graph
True
>>> d.graph.value(URIRef('http://example.net/'), RDFS.label)
rdflib.term.Literal('Net')
"""
kws.setdefault("base", self.base)
p = cast_identifier(p)
s = cast_identifier(s, **kws)
self.graph.add((s, p, self._current()))
return self._subject_stack(s)
def rdftype(self, t):
"""
Shorthand for setting rdf:type of the current subject.
Usage::
>>> from rdflib import URIRef
>>> from rdflib.namespace import RDF, RDFS
>>> d = Describer(about="http://example.org/")
>>> d.rdftype(RDFS.Resource)
>>> (URIRef('http://example.org/'),
... RDF.type, RDFS.Resource) in d.graph
True
"""
self.graph.add((self._current(), RDF.type, t))
def _current(self):
return self._subjects[-1]
@contextmanager
def _subject_stack(self, subject):
self._subjects.append(subject)
yield None
self._subjects.pop()
def cast_value(v, **kws):
if not isinstance(v, Literal):
v = Literal(v, **kws)
return v
def cast_identifier(ref, **kws):
ref = ref or BNode()
if not isinstance(ref, Identifier):
ref = URIRef(ref, **kws)
return ref
@@ -0,0 +1,355 @@
"""Convert (to and) from rdflib graphs to other well known graph libraries.
Currently the following libraries are supported:
- networkx: MultiDiGraph, DiGraph, Graph
- graph_tool: Graph
Doctests in this file are all skipped, as we can't run them conditionally if
networkx or graph_tool are available and they would err otherwise.
see ../../test/test_extras_external_graph_libs.py for conditional tests
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, Dict, List
if TYPE_CHECKING:
from rdflib.graph import Graph
logger = logging.getLogger(__name__)
def _identity(x):
return x
def _rdflib_to_networkx_graph(
graph: Graph,
nxgraph,
calc_weights: bool,
edge_attrs,
transform_s=_identity,
transform_o=_identity,
):
"""Helper method for multidigraph, digraph and graph.
Modifies nxgraph in-place!
Arguments:
graph: an rdflib.Graph.
nxgraph: a networkx.Graph/DiGraph/MultiDigraph.
calc_weights: If True adds a 'weight' attribute to each edge according
to the count of s,p,o triples between s and o, which is meaningful
for Graph/DiGraph.
edge_attrs: Callable to construct edge data from s, p, o.
'triples' attribute is handled specially to be merged.
'weight' should not be generated if calc_weights==True.
(see invokers below!)
transform_s: Callable to transform node generated from s.
transform_o: Callable to transform node generated from o.
"""
assert callable(edge_attrs)
assert callable(transform_s)
assert callable(transform_o)
import networkx as nx
for s, p, o in graph:
ts, to = transform_s(s), transform_o(o) # apply possible transformations
data = nxgraph.get_edge_data(ts, to)
if data is None or isinstance(nxgraph, nx.MultiDiGraph):
# no edge yet, set defaults
data = edge_attrs(s, p, o)
if calc_weights:
data["weight"] = 1
nxgraph.add_edge(ts, to, **data)
else:
# already have an edge, just update attributes
if calc_weights:
data["weight"] += 1
if "triples" in data:
d = edge_attrs(s, p, o)
data["triples"].extend(d["triples"])
def rdflib_to_networkx_multidigraph(
graph: Graph, edge_attrs=lambda s, p, o: {"key": p}, **kwds
):
r"""Converts the given graph into a networkx.MultiDiGraph.
The subjects and objects are the later nodes of the MultiDiGraph.
The predicates are used as edge keys (to identify multi-edges).
:Parameters:
- graph: a rdflib.Graph.
- edge_attrs: Callable to construct later edge_attributes. It receives
3 variables (s, p, o) and should construct a dictionary that is
passed to networkx's add_edge(s, o, \*\*attrs) function.
By default this will include setting the MultiDiGraph key=p here.
If you don't want to be able to re-identify the edge later on, you
can set this to ``lambda s, p, o: {}``. In this case MultiDiGraph's
default (increasing ints) will be used.
Returns:
networkx.MultiDiGraph
>>> from rdflib import Graph, URIRef, Literal
>>> g = Graph()
>>> a, b, l = URIRef('a'), URIRef('b'), Literal('l')
>>> p, q = URIRef('p'), URIRef('q')
>>> edges = [(a, p, b), (a, q, b), (b, p, a), (b, p, l)]
>>> for t in edges:
... g.add(t)
...
>>> mdg = rdflib_to_networkx_multidigraph(g)
>>> len(mdg.edges())
4
>>> mdg.has_edge(a, b)
True
>>> mdg.has_edge(a, b, key=p)
True
>>> mdg.has_edge(a, b, key=q)
True
>>> mdg = rdflib_to_networkx_multidigraph(g, edge_attrs=lambda s,p,o: {})
>>> mdg.has_edge(a, b, key=0)
True
>>> mdg.has_edge(a, b, key=1)
True
"""
import networkx as nx
mdg = nx.MultiDiGraph()
_rdflib_to_networkx_graph(graph, mdg, False, edge_attrs, **kwds)
return mdg
def rdflib_to_networkx_digraph(
graph: Graph,
calc_weights: bool = True,
edge_attrs=lambda s, p, o: {"triples": [(s, p, o)]},
**kwds,
):
r"""Converts the given graph into a networkx.DiGraph.
As an rdflib.Graph() can contain multiple edges between nodes, by default
adds the a 'triples' attribute to the single DiGraph edge with a list of
all triples between s and o.
Also by default calculates the edge weight as the length of triples.
:Parameters:
- ``graph``: a rdflib.Graph.
- ``calc_weights``: If true calculate multi-graph edge-count as edge 'weight'
- ``edge_attrs``: Callable to construct later edge_attributes. It receives
3 variables (s, p, o) and should construct a dictionary that is passed to
networkx's add_edge(s, o, \*\*attrs) function.
By default this will include setting the 'triples' attribute here,
which is treated specially by us to be merged. Other attributes of
multi-edges will only contain the attributes of the first edge.
If you don't want the 'triples' attribute for tracking, set this to
``lambda s, p, o: {}``.
Returns: networkx.DiGraph
>>> from rdflib import Graph, URIRef, Literal
>>> g = Graph()
>>> a, b, l = URIRef('a'), URIRef('b'), Literal('l')
>>> p, q = URIRef('p'), URIRef('q')
>>> edges = [(a, p, b), (a, q, b), (b, p, a), (b, p, l)]
>>> for t in edges:
... g.add(t)
...
>>> dg = rdflib_to_networkx_digraph(g)
>>> dg[a][b]['weight']
2
>>> sorted(dg[a][b]['triples']) == [(a, p, b), (a, q, b)]
True
>>> len(dg.edges())
3
>>> dg.size()
3
>>> dg.size(weight='weight')
4.0
>>> dg = rdflib_to_networkx_graph(g, False, edge_attrs=lambda s,p,o:{})
>>> 'weight' in dg[a][b]
False
>>> 'triples' in dg[a][b]
False
"""
import networkx as nx
dg = nx.DiGraph()
_rdflib_to_networkx_graph(graph, dg, calc_weights, edge_attrs, **kwds)
return dg
def rdflib_to_networkx_graph(
graph: Graph,
calc_weights: bool = True,
edge_attrs=lambda s, p, o: {"triples": [(s, p, o)]},
**kwds,
):
r"""Converts the given graph into a networkx.Graph.
As an rdflib.Graph() can contain multiple directed edges between nodes, by
default adds the a 'triples' attribute to the single DiGraph edge with a
list of triples between s and o in graph.
Also by default calculates the edge weight as the len(triples).
:Parameters:
- graph: a rdflib.Graph.
- calc_weights: If true calculate multi-graph edge-count as edge 'weight'
- edge_attrs: Callable to construct later edge_attributes. It receives
3 variables (s, p, o) and should construct a dictionary that is
passed to networkx's add_edge(s, o, \*\*attrs) function.
By default this will include setting the 'triples' attribute here,
which is treated specially by us to be merged. Other attributes of
multi-edges will only contain the attributes of the first edge.
If you don't want the 'triples' attribute for tracking, set this to
``lambda s, p, o: {}``.
Returns:
networkx.Graph
>>> from rdflib import Graph, URIRef, Literal
>>> g = Graph()
>>> a, b, l = URIRef('a'), URIRef('b'), Literal('l')
>>> p, q = URIRef('p'), URIRef('q')
>>> edges = [(a, p, b), (a, q, b), (b, p, a), (b, p, l)]
>>> for t in edges:
... g.add(t)
...
>>> ug = rdflib_to_networkx_graph(g)
>>> ug[a][b]['weight']
3
>>> sorted(ug[a][b]['triples']) == [(a, p, b), (a, q, b), (b, p, a)]
True
>>> len(ug.edges())
2
>>> ug.size()
2
>>> ug.size(weight='weight')
4.0
>>> ug = rdflib_to_networkx_graph(g, False, edge_attrs=lambda s,p,o:{})
>>> 'weight' in ug[a][b]
False
>>> 'triples' in ug[a][b]
False
"""
import networkx as nx
g = nx.Graph()
_rdflib_to_networkx_graph(graph, g, calc_weights, edge_attrs, **kwds)
return g
def rdflib_to_graphtool(
graph: Graph,
v_prop_names: List[str] = ["term"],
e_prop_names: List[str] = ["term"],
transform_s=lambda s, p, o: {"term": s},
transform_p=lambda s, p, o: {"term": p},
transform_o=lambda s, p, o: {"term": o},
):
"""Converts the given graph into a graph_tool.Graph().
The subjects and objects are the later vertices of the Graph.
The predicates become edges.
:Parameters:
- graph: a rdflib.Graph.
- v_prop_names: a list of names for the vertex properties. The default is set
to ['term'] (see transform_s, transform_o below).
- e_prop_names: a list of names for the edge properties.
- transform_s: callable with s, p, o input. Should return a dictionary
containing a value for each name in v_prop_names. By default is set
to {'term': s} which in combination with v_prop_names = ['term']
adds s as 'term' property to the generated vertex for s.
- transform_p: similar to transform_s, but wrt. e_prop_names. By default
returns {'term': p} which adds p as a property to the generated
edge between the vertex for s and the vertex for o.
- transform_o: similar to transform_s.
Returns: graph_tool.Graph()
>>> from rdflib import Graph, URIRef, Literal
>>> g = Graph()
>>> a, b, l = URIRef('a'), URIRef('b'), Literal('l')
>>> p, q = URIRef('p'), URIRef('q')
>>> edges = [(a, p, b), (a, q, b), (b, p, a), (b, p, l)]
>>> for t in edges:
... g.add(t)
...
>>> mdg = rdflib_to_graphtool(g)
>>> len(list(mdg.edges()))
4
>>> from graph_tool import util as gt_util
>>> vpterm = mdg.vertex_properties['term']
>>> va = gt_util.find_vertex(mdg, vpterm, a)[0]
>>> vb = gt_util.find_vertex(mdg, vpterm, b)[0]
>>> vl = gt_util.find_vertex(mdg, vpterm, l)[0]
>>> (va, vb) in [(e.source(), e.target()) for e in list(mdg.edges())]
True
>>> epterm = mdg.edge_properties['term']
>>> len(list(gt_util.find_edge(mdg, epterm, p))) == 3
True
>>> len(list(gt_util.find_edge(mdg, epterm, q))) == 1
True
>>> mdg = rdflib_to_graphtool(
... g,
... e_prop_names=[str('name')],
... transform_p=lambda s, p, o: {str('name'): unicode(p)})
>>> epterm = mdg.edge_properties['name']
>>> len(list(gt_util.find_edge(mdg, epterm, unicode(p)))) == 3
True
>>> len(list(gt_util.find_edge(mdg, epterm, unicode(q)))) == 1
True
"""
# pytype error: Can't find module 'graph_tool'.
import graph_tool as gt # pytype: disable=import-error
g = gt.Graph()
vprops = [(vpn, g.new_vertex_property("object")) for vpn in v_prop_names]
for vpn, vprop in vprops:
g.vertex_properties[vpn] = vprop
eprops = [(epn, g.new_edge_property("object")) for epn in e_prop_names]
for epn, eprop in eprops:
g.edge_properties[epn] = eprop
node_to_vertex: Dict[Any, Any] = {}
for s, p, o in graph:
sv = node_to_vertex.get(s)
if sv is None:
v = g.add_vertex()
node_to_vertex[s] = v
tmp_props = transform_s(s, p, o)
for vpn, vprop in vprops:
vprop[v] = tmp_props[vpn]
sv = v
ov = node_to_vertex.get(o)
if ov is None:
v = g.add_vertex()
node_to_vertex[o] = v
tmp_props = transform_o(s, p, o)
for vpn, vprop in vprops:
vprop[v] = tmp_props[vpn]
ov = v
e = g.add_edge(sv, ov)
tmp_props = transform_p(s, p, o)
for epn, eprop in eprops:
eprop[e] = tmp_props[epn]
return g
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,212 @@
"""
Utilities for interacting with SHACL Shapes Graphs more easily.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Union
from rdflib import BNode, Graph, Literal, URIRef, paths
from rdflib.collection import Collection
from rdflib.namespace import RDF, SH
from rdflib.paths import Path
from rdflib.term import Node
if TYPE_CHECKING:
from rdflib.term import IdentifiedNode
class SHACLPathError(Exception):
pass
# Map the variable length path operators to the corresponding SHACL path predicates
_PATH_MOD_TO_PRED = {
paths.ZeroOrMore: SH.zeroOrMorePath,
paths.OneOrMore: SH.oneOrMorePath,
paths.ZeroOrOne: SH.zeroOrOnePath,
}
# This implementation is roughly based on
# pyshacl.helper.sparql_query_helper::SPARQLQueryHelper._shacl_path_to_sparql_path
def parse_shacl_path(
shapes_graph: Graph,
path_identifier: Node,
) -> Union[URIRef, Path]:
"""
Parse a valid SHACL path (e.g. the object of a triple with predicate sh:path)
from a :class:`~rdflib.graph.Graph` as a :class:`~rdflib.term.URIRef` if the path
is simply a predicate or a :class:`~rdflib.paths.Path` otherwise.
:param shapes_graph: A :class:`~rdflib.graph.Graph` containing the path to be parsed
:param path_identifier: A :class:`~rdflib.term.Node` of the path
:return: A :class:`~rdflib.term.URIRef` or a :class:`~rdflib.paths.Path`
"""
path: Optional[Union[URIRef, Path]] = None
# Literals are not allowed.
if isinstance(path_identifier, Literal):
raise TypeError("Literals are not a valid SHACL path.")
# If a path is a URI, that's the whole path.
elif isinstance(path_identifier, URIRef):
if path_identifier == RDF.nil:
raise SHACLPathError(
"A list of SHACL Paths must contain at least two path items."
)
path = path_identifier
# Handle Sequence Paths
elif shapes_graph.value(path_identifier, RDF.first) is not None:
sequence = list(shapes_graph.items(path_identifier))
if len(sequence) < 2:
raise SHACLPathError(
"A list of SHACL Sequence Paths must contain at least two path items."
)
path = paths.SequencePath(
*(parse_shacl_path(shapes_graph, path) for path in sequence)
)
# Handle sh:inversePath
elif inverse_path := shapes_graph.value(path_identifier, SH.inversePath):
path = paths.InvPath(parse_shacl_path(shapes_graph, inverse_path))
# Handle sh:alternativePath
elif alternative_path := shapes_graph.value(path_identifier, SH.alternativePath):
alternatives = list(shapes_graph.items(alternative_path))
if len(alternatives) < 2:
raise SHACLPathError(
"List of SHACL alternate paths must have at least two path items."
)
path = paths.AlternativePath(
*(
parse_shacl_path(shapes_graph, alternative)
for alternative in alternatives
)
)
# Handle sh:zeroOrMorePath
elif zero_or_more_path := shapes_graph.value(path_identifier, SH.zeroOrMorePath):
path = paths.MulPath(parse_shacl_path(shapes_graph, zero_or_more_path), "*")
# Handle sh:oneOrMorePath
elif one_or_more_path := shapes_graph.value(path_identifier, SH.oneOrMorePath):
path = paths.MulPath(parse_shacl_path(shapes_graph, one_or_more_path), "+")
# Handle sh:zeroOrOnePath
elif zero_or_one_path := shapes_graph.value(path_identifier, SH.zeroOrOnePath):
path = paths.MulPath(parse_shacl_path(shapes_graph, zero_or_one_path), "?")
# Raise error if none of the above options were found
elif path is None:
raise SHACLPathError(f"Cannot parse {repr(path_identifier)} as a SHACL Path.")
return path
def _build_path_component(
graph: Graph, path_component: URIRef | Path
) -> IdentifiedNode:
"""
Helper method that implements the recursive component of SHACL path
triple construction.
:param graph: A :class:`~rdflib.graph.Graph` into which to insert triples
:param graph_component: A :class:`~rdflib.term.URIRef` or
:class:`~rdflib.paths.Path` that is part of a path expression
:return: The :class:`~rdflib.term.IdentifiedNode of the resource in the
graph that corresponds to the provided path_component
"""
# Literals or other types are not allowed
if not isinstance(path_component, (URIRef, Path)):
raise TypeError(
f"Objects of type {type(path_component)} are not valid "
+ "components of a SHACL path."
)
# If the path component is a URI, return it
elif isinstance(path_component, URIRef):
return path_component
# Otherwise, the path component is represented as a blank node
bnode = BNode()
# Handle Sequence Paths
if isinstance(path_component, paths.SequencePath):
# Sequence paths are a Collection directly with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"A list of SHACL Sequence Paths must contain at least two path items."
)
Collection(
graph,
bnode,
[_build_path_component(graph, arg) for arg in path_component.args],
)
# Handle Inverse Paths
elif isinstance(path_component, paths.InvPath):
graph.add(
(bnode, SH.inversePath, _build_path_component(graph, path_component.arg))
)
# Handle Alternative Paths
elif isinstance(path_component, paths.AlternativePath):
# Alternative paths are a Collection but referenced by sh:alternativePath
# with at least two items
if len(path_component.args) < 2:
raise SHACLPathError(
"List of SHACL alternate paths must have at least two path items."
)
coll = Collection(
graph,
BNode(),
[_build_path_component(graph, arg) for arg in path_component.args],
)
graph.add((bnode, SH.alternativePath, coll.uri))
# Handle Variable Length Paths
elif isinstance(path_component, paths.MulPath):
# Get the predicate corresponding to the path modifiier
pred = _PATH_MOD_TO_PRED.get(path_component.mod)
if pred is None:
raise SHACLPathError(f"Unknown path modifier {path_component.mod}")
graph.add((bnode, pred, _build_path_component(graph, path_component.path)))
# Return the blank node created for the provided path_component
return bnode
def build_shacl_path(
path: URIRef | Path, target_graph: Graph | None = None
) -> tuple[IdentifiedNode, Graph | None]:
"""
Build the SHACL Path triples for a path given by a :class:`~rdflib.term.URIRef` for
simple paths or a :class:`~rdflib.paths.Path` for complex paths.
Returns an :class:`~rdflib.term.IdentifiedNode` for the path (which should be
the object of a triple with predicate sh:path) and the graph into which any
new triples were added.
:param path: A :class:`~rdflib.term.URIRef` or a :class:`~rdflib.paths.Path`
:param target_graph: Optionally, a :class:`~rdflib.graph.Graph` into which to put
constructed triples. If not provided, a new graph will be created
:return: A (path_identifier, graph) tuple where:
- path_identifier: If path is a :class:`~rdflib.term.URIRef`, this is simply
the provided path. If path is a :class:`~rdflib.paths.Path`, this is
the :class:`~rdflib.term.BNode` corresponding to the root of the SHACL
path expression added to the graph.
- graph: None if path is a :class:`~rdflib.term.URIRef` (as no new triples
are constructed). If path is a :class:`~rdflib.paths.Path`, this is either the
target_graph provided or a new graph into which the path triples were added.
"""
# If a path is a URI, that's the whole path. No graph needs to be constructed.
if isinstance(path, URIRef):
return path, None
# Create a graph if one was not provided
if target_graph is None:
target_graph = Graph()
# Recurse through the path to build the graph representation
return _build_path_component(target_graph, path), target_graph