Skip to content

Commit

Permalink
feat(ingest/great_expectations): support in-memory (Pandas) data asse…
Browse files Browse the repository at this point in the history
…ts (datahub-project#9811)

Co-authored-by: Achraf BOUAOUDA <[email protected]>
  • Loading branch information
bouaouda-achraf and Achraf BOUAOUDA authored Apr 4, 2024
1 parent f38b626 commit b5615fa
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import (
RuntimeDataBatchSpec,
RuntimeQueryBatchSpec,
SqlAlchemyDatasourceBatchSpec,
)
Expand All @@ -24,6 +25,7 @@
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.execution_engine.sqlalchemy_execution_engine import (
SqlAlchemyExecutionEngine,
)
Expand Down Expand Up @@ -566,10 +568,12 @@ def get_dataset_partitions(self, batch_identifier, data_asset):

logger.debug("Finding datasets being validated")

# for now, we support only v3-api and sqlalchemy execution engine
if isinstance(data_asset, Validator) and isinstance(
data_asset.execution_engine, SqlAlchemyExecutionEngine
):
# for now, we support only v3-api and sqlalchemy execution engine and Pandas engine
is_sql_alchemy = isinstance(data_asset, Validator) and (
isinstance(data_asset.execution_engine, SqlAlchemyExecutionEngine)
)
is_pandas = isinstance(data_asset.execution_engine, PandasExecutionEngine)
if is_sql_alchemy or is_pandas:
ge_batch_spec = data_asset.active_batch_spec
partitionSpec = None
batchSpecProperties = {
Expand All @@ -581,10 +585,14 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
),
}
sqlalchemy_uri = None
if isinstance(data_asset.execution_engine.engine, Engine):
if is_sql_alchemy and isinstance(
data_asset.execution_engine.engine, Engine
):
sqlalchemy_uri = data_asset.execution_engine.engine.url
# For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection
elif isinstance(data_asset.execution_engine.engine, Connection):
elif is_sql_alchemy and isinstance(
data_asset.execution_engine.engine, Connection
):
sqlalchemy_uri = data_asset.execution_engine.engine.engine.url

if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec):
Expand Down Expand Up @@ -680,6 +688,30 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
"batchSpec": batchSpec,
}
)
elif isinstance(ge_batch_spec, RuntimeDataBatchSpec):
data_platform = self.get_platform_instance(
data_asset.active_batch_definition.datasource_name
)
dataset_urn = builder.make_dataset_urn_with_platform_instance(
platform=data_platform
if self.platform_alias is None
else self.platform_alias,
name=data_asset.active_batch_definition.datasource_name,
platform_instance="",
env=self.env,
)
batchSpec = BatchSpec(
nativeBatchId=batch_identifier,
query="",
customProperties=batchSpecProperties,
)
dataset_partitions.append(
{
"dataset_urn": dataset_urn,
"partitionSpec": partitionSpec,
"batchSpec": batchSpec,
}
)
else:
warn(
"DataHubValidationAction does not recognize this GE batch spec type- {batch_spec_type}.".format(
Expand Down
163 changes: 158 additions & 5 deletions metadata-ingestion/tests/unit/test_great_expectations_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
from datetime import datetime, timezone
from unittest import mock

import pandas as pd
import pytest
from great_expectations.core.batch import Batch, BatchDefinition, BatchRequest
from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec
from great_expectations.core.batch_spec import (
RuntimeDataBatchSpec,
SqlAlchemyDatasourceBatchSpec,
)
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult,
)
Expand All @@ -21,6 +25,9 @@
from great_expectations.execution_engine.pandas_execution_engine import (
PandasExecutionEngine,
)
from great_expectations.execution_engine.sparkdf_execution_engine import (
SparkDFExecutionEngine,
)
from great_expectations.execution_engine.sqlalchemy_execution_engine import (
SqlAlchemyExecutionEngine,
)
Expand Down Expand Up @@ -87,12 +94,80 @@ def ge_validator_sqlalchemy() -> Validator:
return validator


@pytest.fixture(scope="function")
def ge_validator_spark() -> Validator:
validator = Validator(execution_engine=SparkDFExecutionEngine())
return validator


@pytest.fixture(scope="function")
def ge_validator_pandas() -> Validator:
validator = Validator(execution_engine=PandasExecutionEngine())
validator = Validator(
execution_engine=PandasExecutionEngine(),
batches=[
Batch(
data=pd.DataFrame({"foo": [10, 20], "bar": [100, 200]}),
batch_request=BatchRequest(
datasource_name="my_df_datasource",
data_connector_name="pandas_df",
data_asset_name="foobar",
),
batch_definition=BatchDefinition(
datasource_name="my_df_datasource",
data_connector_name="pandas_df",
data_asset_name="foobar",
batch_identifiers=IDDict(),
),
batch_spec=RuntimeDataBatchSpec(
{
"data_asset_name": "foobar",
"batch_identifiers": {},
"batch_data": {},
"type": "pandas_dataframe",
}
),
)
],
)
return validator


@pytest.fixture(scope="function")
def ge_validation_result_suite_pandas() -> ExpectationSuiteValidationResult:
validation_result_suite = ExpectationSuiteValidationResult(
results=[
{
"success": True,
"result": {},
"expectation_config": {
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "column",
"batch_id": "010ef8c1cd417910b971f4468f024ec6",
},
"meta": {},
},
}
],
success=True,
statistics={
"evaluated_expectations": 1,
"successful_expectations": 1,
"unsuccessful_expectations": 0,
"success_percent": 100,
},
meta={
"great_expectations_version": "v0.13.40",
"expectation_suite_name": "asset.default",
"run_id": {
"run_name": "test_200",
},
"validation_time": "20211228T130000.000000Z",
},
)
return validation_result_suite


@pytest.fixture(scope="function")
def ge_validation_result_suite() -> ExpectationSuiteValidationResult:
validation_result_suite = ExpectationSuiteValidationResult(
Expand Down Expand Up @@ -144,8 +219,22 @@ def ge_validation_result_suite_id() -> ValidationResultIdentifier:
return validation_result_suite_id


@pytest.fixture(scope="function")
def ge_validation_result_suite_id_pandas() -> ValidationResultIdentifier:
validation_result_suite_id = ValidationResultIdentifier(
expectation_suite_identifier=ExpectationSuiteIdentifier("asset.default"),
run_id=RunIdentifier(
run_name="test_200",
run_time=datetime.fromtimestamp(1640701702, tz=timezone.utc),
),
batch_identifier="010ef8c1cd417910b971f4468f024ec6",
)

return validation_result_suite_id


@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter.emit_mcp", autospec=True)
def test_DataHubValidationAction_basic(
def test_DataHubValidationAction_sqlalchemy(
mock_emitter: mock.MagicMock,
ge_data_context: DataContext,
ge_validator_sqlalchemy: Validator,
Expand Down Expand Up @@ -248,6 +337,70 @@ def test_DataHubValidationAction_basic(
)


@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter.emit_mcp", autospec=True)
def test_DataHubValidationAction_pandas(
mock_emitter: mock.MagicMock,
ge_data_context: DataContext,
ge_validator_pandas: Validator,
ge_validation_result_suite_pandas: ExpectationSuiteValidationResult,
ge_validation_result_suite_id_pandas: ValidationResultIdentifier,
) -> None:
server_url = "http://localhost:9999"

datahub_action = DataHubValidationAction(
data_context=ge_data_context,
server_url=server_url,
platform_instance_map={"my_df_datasource": "custom_platefrom"},
)

assert datahub_action.run(
validation_result_suite_identifier=ge_validation_result_suite_id_pandas,
validation_result_suite=ge_validation_result_suite_pandas,
data_asset=ge_validator_pandas,
) == {"datahub_notification_result": "DataHub notification succeeded"}

mock_emitter.assert_has_calls(
[
mock.call(
mock.ANY,
MetadataChangeProposalWrapper(
entityType="assertion",
changeType="UPSERT",
entityUrn="urn:li:assertion:7e04bcc3b85897d6d3fef6c998db6b05",
aspectName="assertionInfo",
aspect=AssertionInfoClass(
customProperties={"expectation_suite_name": "asset.default"},
type="DATASET",
datasetAssertion=DatasetAssertionInfoClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:custom_platefrom,my_df_datasource,PROD)",
scope=DatasetAssertionScopeClass.DATASET_COLUMN,
operator="NOT_NULL",
fields=[
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:custom_platefrom,my_df_datasource,PROD),column)"
],
aggregation="IDENTITY",
nativeType="expect_column_values_to_not_be_null",
nativeParameters={"column": "column"},
),
),
),
),
mock.call(
mock.ANY,
MetadataChangeProposalWrapper(
entityType="assertion",
changeType="UPSERT",
entityUrn="urn:li:assertion:7e04bcc3b85897d6d3fef6c998db6b05",
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform="urn:li:dataPlatform:great-expectations"
),
),
),
]
)


def test_DataHubValidationAction_graceful_failure(
ge_data_context: DataContext,
ge_validator_sqlalchemy: Validator,
Expand All @@ -269,7 +422,7 @@ def test_DataHubValidationAction_graceful_failure(

def test_DataHubValidationAction_not_supported(
ge_data_context: DataContext,
ge_validator_pandas: Validator,
ge_validator_spark: Validator,
ge_validation_result_suite: ExpectationSuiteValidationResult,
ge_validation_result_suite_id: ValidationResultIdentifier,
) -> None:
Expand All @@ -282,5 +435,5 @@ def test_DataHubValidationAction_not_supported(
assert datahub_action.run(
validation_result_suite_identifier=ge_validation_result_suite_id,
validation_result_suite=ge_validation_result_suite,
data_asset=ge_validator_pandas,
data_asset=ge_validator_spark,
) == {"datahub_notification_result": "none required"}

0 comments on commit b5615fa

Please sign in to comment.