Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rdf convert #63

Merged
merged 4 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- export to ODCS (#49).
- test - show a test summary table.
- lint - Support local schema (#46).
- convert to rdf (#52)
florian23 marked this conversation as resolved.
Show resolved Hide resolved

## [0.9.4] - 2024-02-18

Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,15 @@ Available export options:
| `sql` | Export to SQL DDL | TBD |
| `protobuf` | Export to Protobuf | TBD |

### Convert

The convert function converts a given data contract into a RDF representation.

```shell
datacontract --format rdf --base https://www.example.com/ datacontract.yaml
```


## Development Setup

Python base interpreter should be 3.11.x (unless working on 3.12 release candidate).
Expand Down
18 changes: 17 additions & 1 deletion datacontract/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
from importlib import metadata
from typing import Iterable
from typing import Iterable, Optional

import typer
from click import Context
Expand Down Expand Up @@ -133,6 +133,22 @@ def export(
result = DataContract(data_contract_file=location).export(format)
print(result)

class ConvertFormat(str, Enum):
rdf = "rdf"

@app.command()
def convert(
format: Annotated[ConvertFormat, typer.Option(help="The format to convert the data contract to.")],
base: Annotated[Optional[str], typer.Option(help="The base URI used to generate the RDF graph.")] = "",
location: Annotated[
str, typer.Argument(help="The location (url or path) of the data contract yaml.")] = "datacontract.yaml",
):
"""
Convert the data contract to a specific format. Prints to stdout
"""
result = DataContract(data_contract_file=location).convert(format, base)
print(result)


def _handle_result(run):
_print_table(run)
Expand Down
16 changes: 15 additions & 1 deletion datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datacontract.export.jsonschema_converter import to_jsonschema
from datacontract.export.odcs_converter import to_odcs
from datacontract.export.sodacl_converter import to_sodacl
from datacontract.export.rdf_converter import to_rdf
from datacontract.integration.publish_datamesh_manager import \
publish_datamesh_manager
from datacontract.lint import resolve
Expand Down Expand Up @@ -145,7 +146,7 @@ def test(self) -> Run:
def diff(self, other):
pass

def export(self, export_format) -> str:
def export(self, export_format, base) -> str:
data_contract = resolve.resolve_data_contract(self._data_contract_file, self._data_contract_str,
self._data_contract)
if export_format == "jsonschema":
Expand All @@ -162,6 +163,19 @@ def export(self, export_format) -> str:
print(f"Export format {export_format} not supported.")
return ""

def convert(self, convert_format, base) -> str:
data_contract = resolve.resolve_data_contract(self._data_contract_file, self._data_contract_str,
self._data_contract)
if convert_format == "rdf":
if base is not None:
return to_rdf(data_contract, base).serialize(format="n3")
else:
return to_rdf(data_contract, "").serialize(format="n3")
else:
print(f"Convert format {convert_format} not supported.")
return ""


def _get_examples_server(self, data_contract, run, tmp_dir):
run.log_info(f"Copying examples to files in temporary directory {tmp_dir}")
format = "json"
Expand Down
147 changes: 147 additions & 0 deletions datacontract/export/rdf_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from typing import Dict
import inspect
from pydantic import BaseModel
from rdflib import Graph, Literal, BNode, RDF, URIRef, Namespace

from datacontract.model.data_contract_specification import \
DataContractSpecification, Model, Field


def is_literal(property_name):
return property_name in ["dataContractSpecification", "title", "version", "description", "name", "url", "type",
"location", "format", "delimiter", "usage", "limitations",
"billing", "noticePeriod", "required", "unique", "minLength", "maxLength", "example",
"pii", "classification", "data", "enum", "minimum", "maximum", "patterns"]


def is_uriref(property_name):
return property_name in ["model", "domain", "owner"]


def to_rdf(data_contract_spec: DataContractSpecification, base):
if base is not None:
g = Graph(base=base)
else:
g = Graph(base=Namespace(""))

dc = Namespace("https://datacontract.com/DataContractSpecification/0.9.2/")
dcx = Namespace("https://datacontract.com/DataContractSpecification/0.9.2/Extension/")

g.bind("dc", dc)
g.bind("dcx", dcx)

this_contract = URIRef(data_contract_spec.id)

g.add((this_contract, dc.dataContractSpecification, Literal(data_contract_spec.dataContractSpecification)))
g.add((this_contract, dc.id, Literal(data_contract_spec.id)))
g.add((this_contract, RDF.type, URIRef(dc + "DataContract")))

add_info(contract=this_contract, info=data_contract_spec.info, graph=g, dc=dc, dcx=dcx)

if data_contract_spec.terms is not None:
add_terms(contract=this_contract, terms=data_contract_spec.terms, graph=g, dc=dc, dcx=dcx)

for server_name, server in data_contract_spec.servers.items():
add_server(contract=this_contract, server=server, server_name=server_name, graph=g, dc=dc, dcx=dcx)

for model_name, model in data_contract_spec.models.items():
add_model(contract=this_contract, model=model, model_name=model_name, graph=g, dc=dc, dcx=dcx)

for example in data_contract_spec.examples:
add_example(contract=this_contract, example=example, graph=g, dc=dc, dcx=dcx)

g.commit()
g.close()

return g


def add_example(contract, example, graph, dc, dcx):
an_example = BNode()
graph.add((contract, dc['example'], an_example))
graph.add((an_example, RDF.type, URIRef(dc + "Example")))
for example_property in example.model_fields:
add_triple(sub=an_example, pred=example_property, obj=example, graph=graph, dc=dc, dcx=dcx)


def add_triple(sub, pred, obj, graph, dc, dcx):
if pred == "ref":
pass
elif isinstance(getattr(obj, pred), list):
for item in getattr(obj, pred):
add_predicate(sub=sub, pred=pred, obj=item, graph=graph, dc=dc, dcx=dcx)
elif isinstance(getattr(obj, pred), dict):
pass
else:
add_predicate(sub=sub, pred=pred, obj=obj, graph=graph, dc=dc, dcx=dcx)


def add_model(contract, model, model_name, graph, dc, dcx):
a_model = URIRef(model_name)
graph.add((contract, dc['model'], a_model))
graph.add((a_model, dc.description, Literal(model.description)))
graph.add((a_model, RDF.type, URIRef(dc + "Model")))
for field_name, field in model.fields.items():
a_field = BNode()
graph.add((a_model, dc['field'], a_field))
graph.add((a_field, RDF.type, URIRef(dc + "Field")))
graph.add((a_field, dc['name'], Literal(field_name)))
for field_property in field.model_fields:
add_triple(sub=a_field, pred=field_property, obj=field, graph=graph, dc=dc, dcx=dcx)


def add_server(contract, server, server_name, graph, dc, dcx):
a_server = URIRef(server_name)
graph.add((contract, dc.server, a_server))
graph.add((a_server, RDF.type, URIRef(dc + "Server")))
for server_property_name in server.model_fields:
add_triple(sub=a_server, pred=server_property_name, obj=server, graph=graph, dc=dc, dcx=dcx)


def add_terms(contract, terms, graph, dc, dcx):
bnode_terms = BNode()
graph.add((contract, dc.terms, bnode_terms))
graph.add((bnode_terms, RDF.type, URIRef(dc + "Terms")))
for term_name in terms.model_fields:
add_triple(sub=bnode_terms, pred=term_name, obj=terms, graph=graph, dc=dc, dcx=dcx)


def add_info(contract, info, graph, dc, dcx):
bnode_info = BNode()
graph.add((contract, dc.info, bnode_info))
graph.add((bnode_info, RDF.type, URIRef(dc + "Info")))
graph.add((bnode_info, dc.title, Literal(info.title)))
graph.add((bnode_info, dc.description, Literal(info.description)))
graph.add((bnode_info, dc.version, Literal(info.version)))

# add owner
owner = URIRef(info.owner)
graph.add((bnode_info, dc.owner, owner))

# add contact
contact = BNode()
graph.add((bnode_info, dc.contact, contact))
graph.add((contact, RDF.type, URIRef(dc + "Contact")))
for contact_property in info.contact.model_fields:
add_triple(sub=contact, pred=contact_property, obj=info.contact, graph=graph, dc=dc, dcx=dcx)


def add_predicate(sub, pred, obj, graph, dc, dcx):
if isinstance(obj, BaseModel):
if getattr(obj, pred) is not None:
if is_literal(pred):
graph.add((sub, dc[pred], Literal(getattr(obj, pred))))
elif is_uriref(pred):
graph.add((sub, dc[pred], URIRef(getattr(obj, pred))))
else:
# treat it as an extension
graph.add((sub, dcx[pred], Literal(getattr(obj, pred))))
else:
# assume primitive
if is_literal(pred):
graph.add((sub, dc[pred], Literal(obj)))
elif is_uriref(pred):
graph.add((sub, dc[pred], URIRef(obj)))
else:
# treat it as an extension
graph.add((sub, dcx[pred], Literal(obj)))
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies = [
"fastjsonschema~=2.19.1",
"python-dotenv~=1.0.0",
"s3fs==2024.2.0",
"rdflib==7.0.0",
]

[project.optional-dependencies]
Expand Down
129 changes: 129 additions & 0 deletions tests/examples/export/rdf/datacontract-complex.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
dataContractSpecification: 0.9.2
id: orders-latest
info:
title: Orders Latest
version: 1.0.0
description: |
Successful customer orders in the webshop. All orders since 2020-01-01. Orders with their line items are in their current state (no history included).
owner: urn:acme:CheckoutTeam
contact:
name: John Doe (Data Product Owner)
url: https://teams.microsoft.com/l/channel/acme/checkout
servers:
production:
type: "s3"
endpointUrl: __S3_ENDPOINT_URL__
location: "s3://multiple-bucket/examples/s3-json-multiple-models/data/{model}/*.json"
format: "json"
delimiter: "new_line"
terms:
usage: >
Data can be used for reports, analytics and machine learning use cases.
Order may be linked and joined by other tables
limitations: >
Not suitable for real-time use cases.
Data may not be used to identify individual customers.
Max data processing per day: 10 TiB
billing: 5000 USD per month
noticePeriod: P3M
models:
orders:
description: One record per order. Includes cancelled and deleted orders.
type: table
fields:
order_id:
$ref: '#/definitions/order_id'
required: true
unique: true
order_timestamp:
description: The business timestamp in UTC when the order was successfully registered in the source system and the payment was successful.
type: timestamp
required: true
order_total:
description: Total amount the smallest monetary unit (e.g., cents).
type: long
required: true
customer_id:
description: Unique identifier for the customer.
type: text
minLength: 10
maxLength: 20
customer_email_address:
description: The email address, as entered by the customer. The email address was not verified.
type: text
format: email
required: true
line_items:
description: A single article that is part of an order.
type: table
fields:
lines_item_id:
type: text
description: Primary key of the lines_item_id table
required: true
unique: true
order_id:
$ref: '#/definitions/order_id'
sku:
description: The purchased article number
$ref: '#/definitions/sku'
definitions:
order_id:
domain: checkout
name: order_id
title: Order ID
type: text
format: uuid
description: An internal ID that identifies an order in the online shop.
example: 243c25e5-a081-43a9-aeab-6d5d5b6cb5e2
pii: true
classification: restricted
sku:
domain: inventory
name: sku
title: Stock Keeping Unit
type: text
pattern: ^[A-Za-z0-9]{8,14}$
example: "96385074"
description: |
A Stock Keeping Unit (SKU) is an internal unique identifier for an article.
It is typically associated with an article's barcode, such as the EAN/GTIN.
examples:
- type: csv # csv, json, yaml, custom
model: orders
data: |- # expressed as string or inline yaml or via "$ref: data.csv"
order_id,order_timestamp,order_total
"1001","2023-09-09T08:30:00Z",2500
"1002","2023-09-08T15:45:00Z",1800
"1003","2023-09-07T12:15:00Z",3200
"1004","2023-09-06T19:20:00Z",1500
"1005","2023-09-05T10:10:00Z",4200
"1006","2023-09-04T14:55:00Z",2800
"1007","2023-09-03T21:05:00Z",1900
"1008","2023-09-02T17:40:00Z",3600
"1009","2023-09-01T09:25:00Z",3100
"1010","2023-08-31T22:50:00Z",2700
- type: csv
model: line_items
data: |-
lines_item_id,order_id,sku
"1","1001","5901234123457"
"2","1001","4001234567890"
"3","1002","5901234123457"
"4","1002","2001234567893"
"5","1003","4001234567890"
"6","1003","5001234567892"
"7","1004","5901234123457"
"8","1005","2001234567893"
"9","1005","5001234567892"
"10","1005","6001234567891"
quality:
type: SodaCL # data quality check format: SodaCL, montecarlo, custom
specification: # expressed as string or inline yaml or via "$ref: checks.yaml"
checks for orders:
- freshness(order_timestamp) < 24h
- row_count >= 5000
- duplicate_count(order_id) = 0
checks for line_items:
- values in (order_id) must exist in orders (order_id)
- row_count >= 5000
Loading
Loading