Skip to content

Commit

Permalink
add rdf convert (#63)
Browse files Browse the repository at this point in the history
* add rdf convert

- cli command "convert" converts a given data contract into a RDF representation
- add rdflib 7.0.0 dependency

I choose to implement a new cli function convert over extending the export function because the goal was to get an RDF representation of a given contract rather than creating a schema representation of the data model. Further exploration into the RDF topic might involve adding export functionality using the RDB2RDF mapping (see https://www.w3.org/2001/sw/wiki/RDB2RDF).

The convert function maps a data contract and its properties into the concepts DataContract, Server, Model, Field, Contact, Info, Terms and Example.

For example the following data contract

---------------------------------
dataContractSpecification: 0.9.2
id: orders-unit-test
info:
  title: Orders Unit Test
  version: 1.0.0
---------------------------------

Is mapped to the following RDF triple

---------------------------------
@Prefix dc1: <https://datacontract.com/DataContractSpecification/0.9.2/> .

<orders-unit-test> a dc1:DataContract ;
    dc1:dataContractSpecification "0.9.2" ;
    dc1:id "orders-unit-test" ;
    dc1:info [ a dc1:Info ;
            dc1:title "Orders Unit Test" ;
            dc1:version "1.0.0" ] ;
---------------------------------

Each model of a data contract is mapped to a single Model instance.

--------------------------------
models:
  orders:
    description: The orders model
  line_items:
    description: The line items model
--------------------------------

is mapped into

--------------------------------
@Prefix dc1: <https://datacontract.com/DataContractSpecification/0.9.2/> .

<orders> a dc1:Model ;
    dc1:description "The orders model" ;

<line_items> a dc1:Model ;
    dc1:description "The line items model" ;
--------------------------------

* bugfix in data_contract.convert

- error handling was using the wrong parameter

* add mr review feedback

- remove convert function
- add rdf export function
- add optional parameter rdf_base to export function
- fix pipeline error missing 1 required posiutional argument base
- add description for RDF export in README.md
- add potential use case descriptions for RDF export in README.md
- rename test_convert_rdf.py to test_export_rdf.py

---------

Co-authored-by: Florian Thiemer <[email protected]>
  • Loading branch information
florian23 and FlorianThiemerAssecor authored Mar 2, 2024
1 parent 1508742 commit 2ea617b
Show file tree
Hide file tree
Showing 10 changed files with 600 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- export to rdf (#52)

This is a hugh step forward, we now support testing Kafka messages.
We start with JSON messages, Avro and Protobuf will follow.
Expand Down
50 changes: 37 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,20 +355,44 @@ datacontract export --format dbt

Available export options:

| Type | Description | Status |
|--------------------|------------------------------------------------|--------|
| `jsonschema` | Export to JSON Schema | ✅ |
| `odcs` | Export to Open Data Contract Standard (ODCS) | ✅ |
| `sodacl` | Export to SodaCL quality checks in YAML format | ✅ |
| `dbt` | Export to dbt models in YAML format | ✅ |
| `dbt-sources` | Export to dbt sources in YAML format | ✅ |
| `dbt-staging-sql` | Export to dbt staging SQL models | ✅ |
| `avro` | Export to AVRO models | TBD |
| `pydantic` | Export to pydantic models | TBD |
| `sql` | Export to SQL DDL | TBD |
| `protobuf` | Export to Protobuf | TBD |
| Missing something? | Please create an issue on GitHub | TBD |
| Type | Description | Status |
|--------------------|---------------------------------------------------------|----------|
| `jsonschema` | Export to JSON Schema | ✅ |
| `odcs` | Export to Open Data Contract Standard (ODCS) | ✅ |
| `sodacl` | Export to SodaCL quality checks in YAML format | ✅ |
| `dbt` | Export to dbt models in YAML format | ✅ |
| `dbt-sources` | Export to dbt sources in YAML format | ✅ |
| `dbt-staging-sql` | Export to dbt staging SQL models | ✅ |
| `rdf` | Export data contract to RDF representation in N3 format | ✅ |
| `avro` | Export to AVRO models | TBD |
| `pydantic` | Export to pydantic models | TBD |
| `sql` | Export to SQL DDL | TBD |
| `protobuf` | Export to Protobuf | TBD |
| Missing something? | Please create an issue on GitHub | TBD |

#### RDF

The export function converts a given data contract into a RDF representation. You have the option to
add a base_url which will be used as the default prefix to resolve relative IRIs inside the document.

```shell
datacontract export --format rdf --rdf-base https://www.example.com/ datacontract.yaml
```

The data contract is mapped onto the following concepts of a yet to be defined Data Contract
Ontology named https://datacontract.com/DataContractSpecification/ :
- DataContract
- Server
- Model

Having the data contract inside an RDF Graph gives us access the following use cases:
- Interoperability with other data contract specification formats
- Store data contracts inside a knowledge graph
- Enhance a semantic search to find and retrieve data contracts
- Linking model elements to already established ontologies and knowledge
- Using full power of OWL to reason about the graph structure of data contracts
- Apply graph algorithms on multiple data contracts (Find similar data contracts, find "gatekeeper"
data products, find the true domain owner of a field attribute)

### Imports

Expand Down
6 changes: 4 additions & 2 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
from importlib import metadata
from typing import Iterable
from typing import Iterable, Optional

import typer
from click import Context
Expand Down Expand Up @@ -123,20 +123,22 @@ class ExportFormat(str, Enum):
dbt_sources = "dbt-sources"
dbt_staging_sql = "dbt-staging-sql"
odcs = "odcs"
rdf = "rdf"


@app.command()
def export(
format: Annotated[ExportFormat, typer.Option(help="The export format.")],
server: Annotated[str, typer.Option(help="The server name to export.")] = None,
rdf_base: Annotated[Optional[str], typer.Option(help="The base URI used to generate the RDF graph.")] = "",
location: Annotated[
str, typer.Argument(help="The location (url or path) of the data contract yaml.")] = "datacontract.yaml",
):
"""
Convert data contract to a specific format. Prints to stdout.
"""
# TODO exception handling
result = DataContract(data_contract_file=location, server=server).export(format)
result = DataContract(data_contract_file=location, server=server).export(format, rdf_base)
print(result)


Expand Down
5 changes: 4 additions & 1 deletion datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datacontract.export.odcs_converter import to_odcs
from datacontract.export.sodacl_converter import to_sodacl
from datacontract.imports.sql_importer import import_sql
from datacontract.export.rdf_converter import to_rdf
from datacontract.integration.publish_datamesh_manager import \
publish_datamesh_manager
from datacontract.lint import resolve
Expand Down Expand Up @@ -159,7 +160,7 @@ def get_data_contract_specification(self):
return resolve.resolve_data_contract(self._data_contract_file, self._data_contract_str,
self._data_contract, self._schema_location)

def export(self, export_format) -> str:
def export(self, export_format, rdf_base) -> str:
data_contract = resolve.resolve_data_contract(self._data_contract_file, self._data_contract_str,
self._data_contract)
if export_format == "jsonschema":
Expand All @@ -179,6 +180,8 @@ def export(self, export_format) -> str:
return to_dbt_staging_sql(data_contract)
if export_format == "odcs":
return to_odcs(data_contract)
if export_format == "rdf":
return to_rdf(data_contract, rdf_base).serialize(format='n3')
else:
print(f"Export format {export_format} not supported.")
return ""
Expand Down
147 changes: 147 additions & 0 deletions datacontract/export/rdf_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from typing import Dict
import inspect
from pydantic import BaseModel
from rdflib import Graph, Literal, BNode, RDF, URIRef, Namespace

from datacontract.model.data_contract_specification import \
DataContractSpecification, Model, Field


def is_literal(property_name):
return property_name in ["dataContractSpecification", "title", "version", "description", "name", "url", "type",
"location", "format", "delimiter", "usage", "limitations",
"billing", "noticePeriod", "required", "unique", "minLength", "maxLength", "example",
"pii", "classification", "data", "enum", "minimum", "maximum", "patterns"]


def is_uriref(property_name):
return property_name in ["model", "domain", "owner"]


def to_rdf(data_contract_spec: DataContractSpecification, base):
if base is not None:
g = Graph(base=base)
else:
g = Graph(base=Namespace(""))

dc = Namespace("https://datacontract.com/DataContractSpecification/0.9.2/")
dcx = Namespace("https://datacontract.com/DataContractSpecification/0.9.2/Extension/")

g.bind("dc", dc)
g.bind("dcx", dcx)

this_contract = URIRef(data_contract_spec.id)

g.add((this_contract, dc.dataContractSpecification, Literal(data_contract_spec.dataContractSpecification)))
g.add((this_contract, dc.id, Literal(data_contract_spec.id)))
g.add((this_contract, RDF.type, URIRef(dc + "DataContract")))

add_info(contract=this_contract, info=data_contract_spec.info, graph=g, dc=dc, dcx=dcx)

if data_contract_spec.terms is not None:
add_terms(contract=this_contract, terms=data_contract_spec.terms, graph=g, dc=dc, dcx=dcx)

for server_name, server in data_contract_spec.servers.items():
add_server(contract=this_contract, server=server, server_name=server_name, graph=g, dc=dc, dcx=dcx)

for model_name, model in data_contract_spec.models.items():
add_model(contract=this_contract, model=model, model_name=model_name, graph=g, dc=dc, dcx=dcx)

for example in data_contract_spec.examples:
add_example(contract=this_contract, example=example, graph=g, dc=dc, dcx=dcx)

g.commit()
g.close()

return g


def add_example(contract, example, graph, dc, dcx):
an_example = BNode()
graph.add((contract, dc['example'], an_example))
graph.add((an_example, RDF.type, URIRef(dc + "Example")))
for example_property in example.model_fields:
add_triple(sub=an_example, pred=example_property, obj=example, graph=graph, dc=dc, dcx=dcx)


def add_triple(sub, pred, obj, graph, dc, dcx):
if pred == "ref":
pass
elif isinstance(getattr(obj, pred), list):
for item in getattr(obj, pred):
add_predicate(sub=sub, pred=pred, obj=item, graph=graph, dc=dc, dcx=dcx)
elif isinstance(getattr(obj, pred), dict):
pass
else:
add_predicate(sub=sub, pred=pred, obj=obj, graph=graph, dc=dc, dcx=dcx)


def add_model(contract, model, model_name, graph, dc, dcx):
a_model = URIRef(model_name)
graph.add((contract, dc['model'], a_model))
graph.add((a_model, dc.description, Literal(model.description)))
graph.add((a_model, RDF.type, URIRef(dc + "Model")))
for field_name, field in model.fields.items():
a_field = BNode()
graph.add((a_model, dc['field'], a_field))
graph.add((a_field, RDF.type, URIRef(dc + "Field")))
graph.add((a_field, dc['name'], Literal(field_name)))
for field_property in field.model_fields:
add_triple(sub=a_field, pred=field_property, obj=field, graph=graph, dc=dc, dcx=dcx)


def add_server(contract, server, server_name, graph, dc, dcx):
a_server = URIRef(server_name)
graph.add((contract, dc.server, a_server))
graph.add((a_server, RDF.type, URIRef(dc + "Server")))
for server_property_name in server.model_fields:
add_triple(sub=a_server, pred=server_property_name, obj=server, graph=graph, dc=dc, dcx=dcx)


def add_terms(contract, terms, graph, dc, dcx):
bnode_terms = BNode()
graph.add((contract, dc.terms, bnode_terms))
graph.add((bnode_terms, RDF.type, URIRef(dc + "Terms")))
for term_name in terms.model_fields:
add_triple(sub=bnode_terms, pred=term_name, obj=terms, graph=graph, dc=dc, dcx=dcx)


def add_info(contract, info, graph, dc, dcx):
bnode_info = BNode()
graph.add((contract, dc.info, bnode_info))
graph.add((bnode_info, RDF.type, URIRef(dc + "Info")))
graph.add((bnode_info, dc.title, Literal(info.title)))
graph.add((bnode_info, dc.description, Literal(info.description)))
graph.add((bnode_info, dc.version, Literal(info.version)))

# add owner
owner = URIRef(info.owner)
graph.add((bnode_info, dc.owner, owner))

# add contact
contact = BNode()
graph.add((bnode_info, dc.contact, contact))
graph.add((contact, RDF.type, URIRef(dc + "Contact")))
for contact_property in info.contact.model_fields:
add_triple(sub=contact, pred=contact_property, obj=info.contact, graph=graph, dc=dc, dcx=dcx)


def add_predicate(sub, pred, obj, graph, dc, dcx):
if isinstance(obj, BaseModel):
if getattr(obj, pred) is not None:
if is_literal(pred):
graph.add((sub, dc[pred], Literal(getattr(obj, pred))))
elif is_uriref(pred):
graph.add((sub, dc[pred], URIRef(getattr(obj, pred))))
else:
# treat it as an extension
graph.add((sub, dcx[pred], Literal(getattr(obj, pred))))
else:
# assume primitive
if is_literal(pred):
graph.add((sub, dc[pred], Literal(obj)))
elif is_uriref(pred):
graph.add((sub, dc[pred], URIRef(obj)))
else:
# treat it as an extension
graph.add((sub, dcx[pred], Literal(obj)))
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"fastjsonschema~=2.19.1",
"python-dotenv~=1.0.0",
"s3fs==2024.2.0",
"rdflib==7.0.0",
]

[project.optional-dependencies]
Expand Down
Loading

0 comments on commit 2ea617b

Please sign in to comment.