Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
syou6162 committed Dec 10, 2024
2 parents 35f861b + 117aa40 commit 78e483d
Show file tree
Hide file tree
Showing 9 changed files with 13,058 additions and 36 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

### Changed
- Type conversion when importing contracts into dbt and exporting contracts from dbt (#534)

### Fixed
- Modify the arguments to narrow down the import target with `--dbt-model` (#532)

- SodaCL: Prevent `KeyError: 'fail'` from happening when testing with SodaCL

## [0.10.15] - 2024-10-26

Expand Down
2 changes: 1 addition & 1 deletion datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,5 @@ def update_reason(check, c):
check.reason = diagnostics_text_split[1].strip()
# print(check.reason)
break # Exit the loop once the desired block is found
if c["diagnostics"]["fail"] is not None:
if "fail" in c["diagnostics"]:
check.reason = f"Got: {c['diagnostics']['value']} Expected: {c['diagnostics']['fail']}"
34 changes: 18 additions & 16 deletions datacontract/export/dbt_converter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Dict, Optional

import yaml

Expand Down Expand Up @@ -52,7 +52,7 @@ def to_dbt_staging_sql(data_contract_spec: DataContractSpecification, model_name
# TODO escape SQL reserved key words, probably dependent on server type
columns.append(field_name)
return f"""
select
select
{", ".join(columns)}
from {{{{ source('{id}', '{model_name}') }}}}
"""
Expand All @@ -69,24 +69,26 @@ def to_dbt_sources_yaml(data_contract_spec: DataContractSpecification, server: s
if data_contract_spec.info.description is not None:
source["description"] = data_contract_spec.info.description
found_server = data_contract_spec.servers.get(server)
adapter_type = None
if found_server is not None:
adapter_type = found_server.type
source["database"] = found_server.database
source["schema"] = found_server.schema_

for model_key, model_value in data_contract_spec.models.items():
dbt_model = _to_dbt_source_table(model_key, model_value)
dbt_model = _to_dbt_source_table(model_key, model_value, adapter_type)
source["tables"].append(dbt_model)
return yaml.dump(dbt, indent=2, sort_keys=False, allow_unicode=True)


def _to_dbt_source_table(model_key, model_value: Model) -> dict:
def _to_dbt_source_table(model_key, model_value: Model, adapter_type: Optional[str]) -> dict:
dbt_model = {
"name": model_key,
}

if model_value.description is not None:
dbt_model["description"] = model_value.description
columns = _to_columns(model_value.fields, False, False)
columns = _to_columns(model_value.fields, False, adapter_type)
if columns:
dbt_model["columns"] = columns
return dbt_model
Expand All @@ -107,7 +109,7 @@ def _to_dbt_model(model_key, model_value: Model, data_contract_spec: DataContrac
dbt_model["config"]["contract"] = {"enforced": True}
if model_value.description is not None:
dbt_model["description"] = model_value.description
columns = _to_columns(model_value.fields, _supports_constraints(model_type), True)
columns = _to_columns(model_value.fields, _supports_constraints(model_type), None)
if columns:
dbt_model["columns"] = columns
return dbt_model
Expand All @@ -130,25 +132,25 @@ def _supports_constraints(model_type):
return model_type == "table" or model_type == "incremental"


def _to_columns(fields: Dict[str, Field], supports_constraints: bool, supports_datatype: bool) -> list:
def _to_columns(fields: Dict[str, Field], supports_constraints: bool, adapter_type: Optional[str]) -> list:
columns = []
for field_name, field in fields.items():
column = _to_column(field, supports_constraints, supports_datatype)
column = _to_column(field, supports_constraints, adapter_type)
column["name"] = field_name
columns.append(column)
return columns


def _to_column(field: Field, supports_constraints: bool, supports_datatype: bool) -> dict:
def _to_column(field: Field, supports_constraints: bool, adapter_type: Optional[str]) -> dict:
column = {}
dbt_type = convert_to_sql_type(field, "snowflake")
adapter_type = adapter_type or "snowflake"
dbt_type = convert_to_sql_type(field, adapter_type)
if dbt_type is not None:
if supports_datatype:
column["data_type"] = dbt_type
else:
column.setdefault("tests", []).append(
{"dbt_expectations.dbt_expectations.expect_column_values_to_be_of_type": {"column_type": dbt_type}}
)
column["data_type"] = dbt_type
else:
column.setdefault("tests", []).append(
{"dbt_expectations.dbt_expectations.expect_column_values_to_be_of_type": {"column_type": dbt_type}}
)
if field.description is not None:
column["description"] = field.description
if field.required:
Expand Down
14 changes: 10 additions & 4 deletions datacontract/imports/dbt_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dbt.artifacts.resources.v1.components import ColumnInfo
from dbt.contracts.graph.manifest import Manifest

from datacontract.imports.bigquery_importer import map_type_from_bigquery
from datacontract.imports.importer import Importer
from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model

Expand Down Expand Up @@ -58,7 +59,7 @@ def import_dbt_manifest(
"""
data_contract_specification.info.title = manifest.metadata.project_name
data_contract_specification.info.dbt_version = manifest.metadata.dbt_version

adapter_type = manifest.metadata.adapter_type
data_contract_specification.models = data_contract_specification.models or {}
for model_contents in manifest.nodes.values():
# Only intressted in processing models.
Expand All @@ -73,19 +74,24 @@ def import_dbt_manifest(
dc_model = Model(
description=model_contents.description,
tags=model_contents.tags,
fields=create_fields(columns=model_contents.columns),
fields=create_fields(columns=model_contents.columns, adapter_type=adapter_type),
)

data_contract_specification.models[model_contents.name] = dc_model

return data_contract_specification

def convert_data_type_by_adapter_type(data_type: str, adapter_type: str) -> str:
if adapter_type == "bigquery":
return map_type_from_bigquery(data_type)
return data_type


def create_fields(columns: dict[str, ColumnInfo]) -> dict[str, Field]:
def create_fields(columns: dict[str, ColumnInfo], adapter_type: str) -> dict[str, Field]:
fields = {
column.name: Field(
description=column.description,
type=column.data_type if column.data_type else "",
type=convert_data_type_by_adapter_type(column.data_type, adapter_type) if column.data_type else "",
tags=column.tags,
)
for column in columns.values()
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ dependencies = [
"pydantic>=2.8.2,<2.11.0",
"pyyaml~=6.0.1",
"requests>=2.31,<2.33",
"fastapi==0.115.5", # move to extra?
"fastapi==0.115.6", # move to extra?
"uvicorn==0.32.1", # move to extra?
"fastjsonschema>=2.19.1,<2.21.0",
"fastjsonschema>=2.19.1,<2.22.0",
"fastparquet==2024.5.0",
"python-multipart==0.0.19",
"rich>=13.7,<13.10",
Expand Down Expand Up @@ -106,12 +106,12 @@ dev = [
"datacontract-cli[all]",
"httpx==0.28.0",
"kafka-python",
"moto==5.0.18",
"moto==5.0.22",
"pandas>=2.1.0",
"pre-commit>=3.7.1,<4.1.0",
"pytest",
"pytest-xdist",
"pymssql==2.3.1",
"pymssql==2.3.2",
"ruff",
"testcontainers[minio,postgres,kafka,mssql]==4.8.2",
"trino==0.330.0",
Expand Down
68 changes: 68 additions & 0 deletions tests/fixtures/dbt/export/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
dataContractSpecification: 1.1.0
id: orders-unit-test
info:
title: Orders Unit Test
version: 1.0.0
status: active
owner: checkout
description: The orders data contract
contact:
email: [email protected]
url: https://wiki.example.com/teams/checkout
otherField: otherValue
terms:
usage: This data contract serves to demo datacontract CLI export.
limitations: Not intended to use in production
billing: free
noticePeriod: P3M
servers:
production:
type: bigquery
environment: production
account: my-account
database: my-database
schema: my-schema
roles:
- name: analyst_us
description: Access to the data for US region
models:
orders:
title: orders
type: table
description: The orders model
fields:
order_id:
title: Order ID
type: varchar
unique: true
required: true
minLength: 8
maxLength: 10
pii: true
classification: sensitive
tags:
- order_id
pattern: ^B[0-9]+$
examples:
- B12345678
- B12345679
order_total:
type: bigint
required: true
description: The order_total field
minimum: 0
maximum: 1000000
quality:
- type: sql
description: 95% of all order total values are expected to be between 10 and 499 EUR.
query: |
SELECT quantile_cont(order_total, 0.95) AS percentile_95
FROM orders
mustBeBetween: [1000, 49900]
order_status:
type: text
required: true
enum:
- pending
- shipped
- delivered
Loading

0 comments on commit 78e483d

Please sign in to comment.