Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "test --examples" #39

Merged
merged 11 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ $ datacontract lint datacontract.yaml
# execute schema and quality checks
$ datacontract test datacontract.yaml

# execute schema and quality checks on the examples within the contract
$ datacontract test --examples datacontract.yaml

# find differences between to data contracts (Coming Soon)
$ datacontract diff datacontract-v1.yaml datacontract-v2.yaml

Expand Down
6 changes: 4 additions & 2 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ def test(
"Use the key of the server object in the data contract yaml file "
"to refer to a server, e.g., `production`, or `all` for all "
"servers (default).")] = "all",
examples: Annotated[bool, typer.Option(
help="Run the schema and quality tests on the example data within the data contract.")] = None,
publish: Annotated[str, typer.Option(
help="")] = None,
help="The url to publish the results after the test")] = None,
):
"""
Run schema and quality tests on configured servers.
"""
print(f"Testing {location}")
run = DataContract(data_contract_file=location, publish_url=publish).test()
run = DataContract(data_contract_file=location, publish_url=publish, examples=examples).test()
_handle_result(run)


Expand Down
74 changes: 60 additions & 14 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import json
import logging
import tempfile
from typing import List

import yaml

from datacontract.engines.datacontract.check_that_datacontract_contains_valid_servers_configuration import \
check_that_datacontract_contains_valid_server_configuration
Expand All @@ -13,7 +17,7 @@
from datacontract.lint import resolve
from datacontract.lint.linters.example_model_linter import ExampleModelLinter
from datacontract.model.data_contract_specification import \
DataContractSpecification
DataContractSpecification, Server
from datacontract.model.exceptions import DataContractException
from datacontract.model.run import \
Run, Check
Expand All @@ -26,13 +30,15 @@ def __init__(
data_contract_str: str = None,
data_contract: DataContractSpecification = None,
server: str = None,
examples: bool = False,
publish_url: str = None,
spark: str = None,
):
self._data_contract_file = data_contract_file
self._data_contract_str = data_contract_str
self._data_contract = data_contract
self._server = server
self._examples = examples
self._publish_url = publish_url
self._spark = spark

Expand Down Expand Up @@ -82,20 +88,27 @@ def test(self) -> Run:

check_that_datacontract_contains_valid_server_configuration(run, data_contract, self._server)
# TODO check yaml contains models
server_name = list(data_contract.servers.keys())[0]
server = data_contract.servers.get(server_name)
run.log_info(f"Running tests for data contract {data_contract.id} with server {server_name}")
run.dataContractId = data_contract.id
run.dataContractVersion = data_contract.info.version
run.dataProductId = server.dataProductId
run.outputPortId = server.outputPortId
run.server = server_name

# 5. check server is supported type
# 6. check server credentials are complete
if server.format == "json":
check_jsonschema(run, data_contract, server)
check_soda_execute(run, data_contract, server, self._spark)
with tempfile.TemporaryDirectory(prefix="datacontract-cli") as tmp_dir:
if self._examples:
server_name = "examples"
server = self._get_examples_server(data_contract, run, tmp_dir)
else:
server_name = list(data_contract.servers.keys())[0]
server = data_contract.servers.get(server_name)

run.log_info(f"Running tests for data contract {data_contract.id} with server {server_name}")
run.dataContractId = data_contract.id
run.dataContractVersion = data_contract.info.version
run.dataProductId = server.dataProductId
run.outputPortId = server.outputPortId
run.server = server_name

# 5. check server is supported type
# 6. check server credentials are complete
if server.format == "json":
check_jsonschema(run, data_contract, server)
check_soda_execute(run, data_contract, server, self._spark)

except DataContractException as e:
run.checks.append(Check(
Expand Down Expand Up @@ -125,6 +138,7 @@ def test(self) -> Run:

return run


def diff(self, other):
pass

Expand All @@ -140,3 +154,35 @@ def export(self, export_format) -> str:
else:
print(f"Export format {export_format} not supported.")
return ""

def _get_examples_server(self, data_contract, run, tmp_dir):
run.log_info(f"Copying examples to files in temporary directory {tmp_dir}")
format = "json"
for example in data_contract.examples:
format = example.type
p = f"{tmp_dir}/{example.model}.{format}"
run.log_info(f"Creating example file {p}")
with open(p, "w") as f:
content = ""
if format == "json" and type(example.data) is list:
content = json.dumps(example.data)
elif format == "json" and type(example.data) is str:
content = example.data
elif format == "yaml" and type(example.data) is list:
content = yaml.dump(example.data)
elif format == "yaml" and type(example.data) is str:
content = example.data
elif format == "csv":
content = example.data
logging.debug(f"Content of example file {p}: {content}")
f.write(content)
path = f"{tmp_dir}" + "/{model}." + format
delimiter = "array"
server = Server(
type="local",
path=path,
format=format,
delimiter=delimiter,
)
run.log_info(f"Using {server} for testing the examples")
return server
34 changes: 23 additions & 11 deletions datacontract/engines/fastjsonschema/check_jsonschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,40 @@ def validate_json_stream(model_name, validate, json_stream):
)


def read_json_lines(file_content: str):
def read_json_lines(file):
file_content = file.read()
for line in file_content.splitlines():
yield json.loads(line)


def read_json_lines_from_file(file):
for line in file:
def read_json_lines_content(file_content: str):
for line in file_content.splitlines():
yield json.loads(line)


def read_json_array(file):
data = json.loads(file)
data = json.load(file)
for item in data:
yield item


def read_json_array_content(file_content: str):
data = json.loads(file_content)
for item in data:
yield item


def read_json_file(file):
yield json.loads(file.read())
yield json.load(file)


def read_json_file_content(file_content: str):
yield json.loads(file_content)


def process_json_file(run, model_name, validate, file, delimiter):
if delimiter == "new_line":
json_stream = read_json_lines_from_file(file)
json_stream = read_json_lines(file)
elif delimiter == "array":
json_stream = read_json_array(file)
else:
Expand All @@ -62,11 +73,12 @@ def process_json_file(run, model_name, validate, file, delimiter):
def process_local_file(run, server, model_name, validate):
path = server.path
if "{model}" in path:
path = path.format(model = model_name)
path = path.format(model=model_name)

if os.path.isdir(path):
return process_directory(run, path, server, model_name, validate)
else:
logging.info(f"Processing file {path}")
with open(path, 'r') as file:
process_json_file(run, model_name, validate, file, server.delimiter)

Expand All @@ -87,16 +99,16 @@ def process_s3_file(server, model_name, validate):
s3_endpoint_url = server.endpointUrl
s3_location = server.location
if "{model}" in s3_location:
s3_location = s3_location.format(model = model_name)
s3_location = s3_location.format(model=model_name)
json_stream = None

for file_content in yield_s3_files(s3_endpoint_url, s3_location):
if server.delimiter == "new_line":
json_stream = read_json_lines(file_content)
json_stream = read_json_lines_content(file_content)
elif server.delimiter == "array":
json_stream = read_json_array(file_content)
json_stream = read_json_array_content(file_content)
else:
json_stream = read_json_file(file_content)
json_stream = read_json_file_content(file_content)

if json_stream is None:
raise DataContractException(
Expand Down
2 changes: 1 addition & 1 deletion datacontract/engines/fastjsonschema/s3/s3_read_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def yield_s3_files(s3_endpoint_url, s3_location):
files = fs.glob(s3_location)
for file in files:
with fs.open(file) as f:
logging.info(f"Reading file {file}")
logging.info(f"Downloading file {file}")
yield f.read()


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies = [
"soda-core-spark[databricks]~=3.1.5",
"soda-core-spark-df~=3.1.5",
"snowflake-connector-python[pandas]>=3.6,<3.8",
"duckdb>=0.9.3.dev3920",
"duckdb==0.9.3.dev3920",
"fastjsonschema~=2.19.1",
"python-dotenv~=1.0.0",
"s3fs==2024.2.0",
Expand Down
35 changes: 35 additions & 0 deletions tests/examples/examples/datacontract_csv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
dataContractSpecification: 0.9.2
id: "61111-0002"
info:
title: "Verbraucherpreisindex: Deutschland, Monate"
description: A data contract for the distribution and use of the German Consumer Price Index data.
version: 1.0.0
owner: my-domain-team
models:
verbraucherpreisindex:
description: Model representing the Consumer Price Index for Germany
fields:
wert:
description: Value of the Consumer Price Index
type: bigint # integer
required: true
jahrMonat:
description: Year and month of the data
type: varchar # string
required: true
qualitaet:
description: Quality of the data
type: varchar # string
enum:
- "vorlaeufig"
- "endgueltig"
examples:
- type: csv
description: Example entry for CPI data
model: verbraucherpreisindex
data: |-
wert, jahrMonat, qualitaet
99, "2022-00",
100, "2022-01",
101, "2022-02", "vorlaeufig"

39 changes: 39 additions & 0 deletions tests/examples/examples/datacontract_inline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
dataContractSpecification: 0.9.2
id: "61111-0002"
info:
title: "Verbraucherpreisindex: Deutschland, Monate"
description: A data contract for the distribution and use of the German Consumer Price Index data.
version: 1.0.0
owner: my-domain-team
models:
verbraucherpreisindex:
description: Model representing the Consumer Price Index for Germany
fields:
wert:
description: Value of the Consumer Price Index
type: integer
required: true
jahrMonat:
description: Year and month of the data
type: string
required: true
qualitaet:
description: Quality of the data
type: string
enum:
- "vorlaeufig"
- "endgueltig"

examples:
- description: Example entry for CPI data
type: json # TODO should be inline
model: verbraucherpreisindex
data:
- wert: 99
jahrMonat: "2022-00"
- wert: 100
jahrMonat: "2022-01"
- wert: 101
jahrMonat: "2022-02"
qualitaet: "vorlaeufig"

35 changes: 35 additions & 0 deletions tests/examples/examples/datacontract_json.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
dataContractSpecification: 0.9.2
id: "61111-0002"
info:
title: "Verbraucherpreisindex: Deutschland, Monate"
description: A data contract for the distribution and use of the German Consumer Price Index data.
version: 1.0.0
owner: my-domain-team
models:
verbraucherpreisindex:
description: Model representing the Consumer Price Index for Germany
fields:
wert:
description: Value of the Consumer Price Index
type: integer
required: true
jahrMonat:
description: Year and month of the data
type: string
required: true
qualitaet:
description: Quality of the data
type: string
enum:
- "vorlaeufig"
- "endgueltig"

examples:
- type: json
description: Example entry for CPI data
model: verbraucherpreisindex
data: |-
[{ "wert": 99, "jahrMonat": "2022-00" },
{ "wert": 100, "jahrMonat": "2022-01" },
{ "wert": 101, "jahrMonat": "2022-02", "qualitaet": "vorlaeufig" }]
25 changes: 25 additions & 0 deletions tests/test_examples_examples_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging

import pytest
from typer.testing import CliRunner

from datacontract.cli import app
from datacontract.data_contract import DataContract

runner = CliRunner()

logging.basicConfig(level=logging.DEBUG, force=True)

def test_cli():
result = runner.invoke(app, ["test", "--examples", "./examples/examples/datacontract_csv.yaml"])
assert result.exit_code == 0


def test_csv():
data_contract = DataContract(data_contract_file="examples/examples/datacontract_csv.yaml", examples=True)
run = data_contract.test()
print(run)
print(run.result)
assert run.result == "passed"


Loading
Loading