diff --git a/CHANGELOG.md b/CHANGELOG.md index aa97c876..82702264 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +This is a hugh step forward, we now support testing Kafka messages. +We start with JSON messages, Avro and Protobuf will follow. + ### Added +- test kafka for JSON messages - import sql (#51) - export to dbt sources: `datacontract export --format dbt-sources` - export to dbt staging sql: `datacontract export --format dbt-staging-sql` diff --git a/README.md b/README.md index c296c006..6f1d3e60 100644 --- a/README.md +++ b/README.md @@ -182,7 +182,7 @@ The application uses different engines, based on the server `type`. | `redshift` | n/a | | Coming soon | TBD | | `databricks` | n/a | Support for Databricks SQL with Unity catalog and Hive metastore. | ✅ | soda-core-spark | | `databricks` | n/a | Support for Spark for programmatic use in Notebooks. | ✅ | soda-core-spark-df | -| `kafka` | `json` | | Coming soon | TBD | +| `kafka` | `json` | Experimental. | ✅ | pyspark
soda-core-spark-df | | `kafka` | `avro` | | Coming soon | TBD | | `kafka` | `protobuf` | | Coming soon | TBD | | `local` | `parquet` | | ✅ | soda-core-duckdb | diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py index 175f27db..c9b779d1 100644 --- a/datacontract/data_contract.py +++ b/datacontract/data_contract.py @@ -100,6 +100,7 @@ def test(self) -> Run: check_that_datacontract_contains_valid_server_configuration(run, data_contract, self._server) # TODO check yaml contains models + # TODO create directory only for examples with tempfile.TemporaryDirectory(prefix="datacontract-cli") as tmp_dir: if self._examples: server_name = "examples" diff --git a/datacontract/engines/soda/check_soda_execute.py b/datacontract/engines/soda/check_soda_execute.py index 913b5d8c..6441e4ec 100644 --- a/datacontract/engines/soda/check_soda_execute.py +++ b/datacontract/engines/soda/check_soda_execute.py @@ -7,6 +7,8 @@ from datacontract.engines.soda.connections.databricks import \ to_databricks_soda_configuration from datacontract.engines.soda.connections.duckdb import get_duckdb_connection +from datacontract.engines.soda.connections.kafka import create_spark_session, \ + read_kafka_topic from datacontract.engines.soda.connections.postgres import \ to_postgres_soda_configuration from datacontract.engines.soda.connections.snowflake import \ @@ -63,6 +65,13 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve soda_configuration_str = to_databricks_soda_configuration(server) scan.add_configuration_yaml_str(soda_configuration_str) scan.set_data_source_name(server.type) + elif server.type == "kafka": + if spark is None: + spark = create_spark_session() + read_kafka_topic(spark, data_contract, server) + scan.add_spark_session(spark, data_source_name=server.type) + scan.set_data_source_name(server.type) + else: run.checks.append(Check( type="general", @@ -92,7 +101,8 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve for c in scan_results.get("checks"): check = Check( type="schema", - result="passed" if c.get("outcome") == "pass" else "failed" if c.get("outcome") == "fail" else c.get("outcome"), + result="passed" if c.get("outcome") == "pass" else "failed" if c.get("outcome") == "fail" else c.get( + "outcome"), reason=', '.join(c.get("outcomeReasons")), name=c.get("name"), model=c.get("table"), diff --git a/datacontract/engines/soda/connections/kafka.py b/datacontract/engines/soda/connections/kafka.py new file mode 100644 index 00000000..b814da32 --- /dev/null +++ b/datacontract/engines/soda/connections/kafka.py @@ -0,0 +1,51 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import from_json, col +from pyspark.sql.types import * + +from datacontract.model.data_contract_specification import \ + DataContractSpecification, Server, Field + + +def create_spark_session(): + spark = (SparkSession.builder.appName("datacontract") + .config("spark.streaming.stopGracefullyOnShutdown", True) + .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0') + .getOrCreate()) + print(f'Using PySpark version {spark.version}') + return spark + + +def read_kafka_topic(spark, data_contract: DataContractSpecification, server: Server): + host = server.host + topic = server.topic + df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", host) \ + .option("subscribe", topic) \ + .load() + df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + schema = to_struct_type(data_contract) + # TODO A good warning when the conversion to json fails + df3 = df2.select(from_json(df2.value, schema).alias("json")).select(col("json.*")) + model_name, model = next(iter(data_contract.models.items())) + df3.createOrReplaceTempView(model_name) + + +def to_struct_type(data_contract: DataContractSpecification) -> StructType: + model_name, model = next(iter(data_contract.models.items())) + struct_fields = [] + for field_name, field in model.fields.items(): + struct_fields.append(to_struct_field(field_name, field)) + return StructType(struct_fields) + + +def to_struct_field(field_name: str, field: Field) -> StructField: + if field.type == "string": + data_type = StringType() + elif field.type == "int": + data_type = IntegerType() + else: + data_type = DataType() + + return StructField(field_name, data_type, nullable=not field.required) diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index a160e9c5..819637ec 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -27,6 +27,7 @@ class Server(BaseModel): host: str = None port: int = None catalog: str = None + topic: str = None http_path: str = None # Use ENV variable token: str = None # Use ENV variable dataProductId: str = None diff --git a/tests/examples/kafka/datacontract.yaml b/tests/examples/kafka/datacontract.yaml index 9d0ca21b..96a20b86 100644 --- a/tests/examples/kafka/datacontract.yaml +++ b/tests/examples/kafka/datacontract.yaml @@ -17,7 +17,7 @@ models: updated_at: type: string available: - type: numeric + type: int location: type: string sku: diff --git a/tests/test_examples_kafka.py b/tests/test_examples_kafka.py index 604af70e..9ba31094 100644 --- a/tests/test_examples_kafka.py +++ b/tests/test_examples_kafka.py @@ -2,10 +2,10 @@ import six +# Fix for Python 3.12 if sys.version_info >= (3, 12, 1): sys.modules['kafka.vendor.six.moves'] = six.moves - import json import logging @@ -15,7 +15,7 @@ from datacontract.data_contract import DataContract -logging.basicConfig(level=logging.DEBUG, force=True) +logging.basicConfig(level=logging.INFO, force=True) datacontract = "examples/kafka/datacontract.yaml" @@ -31,22 +31,19 @@ def remove_container(): request.addfinalizer(remove_container) -# skip this test + def test_examples_kafka(kafka_container: KafkaContainer): send_messages_to_topic("examples/kafka/data/messages.json", 'json-topic') - - assert True data_contract_str = _setup_datacontract() data_contract = DataContract(data_contract_str=data_contract_str) run = data_contract.test() print(run) - assert run.result == "warning" - # assert all(check.result == "passed" for check in run.checks) + assert run.result == "passed" -def send_messages_to_topic(messages_file_path : str, topic_name : str): +def send_messages_to_topic(messages_file_path: str, topic_name: str): print(f"Sending messages from {messages_file_path} to Kafka topic {topic_name}") producer = KafkaProducer(bootstrap_servers=kafka.get_bootstrap_server(),