From 69f21e0e4d4a96ecfbb2bacf078ae2371364b9a6 Mon Sep 17 00:00:00 2001
From: jochen
Date: Sat, 2 Mar 2024 21:45:54 +0100
Subject: [PATCH] Improve support for Kafka
---
README.md | 28 ++++++-
datacontract/data_contract.py | 2 +-
.../engines/soda/check_soda_execute.py | 6 +-
.../engines/soda/connections/kafka.py | 78 +++++++++++++++----
tests/examples/kafka-remote/datacontract.yaml | 38 +++++++++
tests/examples/kafka/datacontract.yaml | 5 ++
tests/test_examples_kafka.py | 2 +-
tests/test_examples_kafka_remote.py | 33 ++++++++
8 files changed, 172 insertions(+), 20 deletions(-)
create mode 100644 tests/examples/kafka-remote/datacontract.yaml
create mode 100644 tests/test_examples_kafka_remote.py
diff --git a/README.md b/README.md
index 4418e406..ea1d18fe 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@
-The `datacontract` CLI is an open source command-line tool for working with [Data Contracts](https://datacontract.com/).
+**The `datacontract` CLI is an open source command-line tool for working with [Data Contracts](https://datacontract.com/).
It uses data contract YAML files to lint the data contract, connect to data sources and execute schema and quality tests, detect breaking changes, and export to different formats. The tool is written in Python. It can be used as a standalone CLI tool, in a CI/CD pipeline, or directly as a Python library.
@@ -343,6 +343,30 @@ run = data_contract.test()
run.result
```
+### Kafka
+
+Kafka support is currently considered experimental.
+
+#### Example
+
+datacontract.yaml
+```yaml
+servers:
+ production:
+ type: kafka
+ host: abc-12345.eu-central-1.aws.confluent.cloud:9092
+ topic: my-topic-name
+ format: json
+```
+
+#### Environment Variables
+
+| Environment Variable | Example | Description |
+|------------------------------------|---------|-----------------------------|
+| `DATACONTRACT_KAFKA_SASL_USERNAME` | `xxx` | The SASL username (key). |
+| `DATACONTRACT_KAFKA_SASL_PASSWORD` | `xxx` | The SASL password (secret). |
+
+
### Exports
@@ -460,4 +484,4 @@ Created by [Stefan Negele](https://www.linkedin.com/in/stefan-negele-573153112/)
-
+**
diff --git a/datacontract/data_contract.py b/datacontract/data_contract.py
index 3cac093b..0c9fdf56 100644
--- a/datacontract/data_contract.py
+++ b/datacontract/data_contract.py
@@ -121,7 +121,7 @@ def test(self) -> Run:
# 6. check server credentials are complete
if server.format == "json":
check_jsonschema(run, data_contract, server)
- check_soda_execute(run, data_contract, server, self._spark)
+ check_soda_execute(run, data_contract, server, self._spark, tmp_dir)
except DataContractException as e:
run.checks.append(Check(
diff --git a/datacontract/engines/soda/check_soda_execute.py b/datacontract/engines/soda/check_soda_execute.py
index 6441e4ec..867b5876 100644
--- a/datacontract/engines/soda/check_soda_execute.py
+++ b/datacontract/engines/soda/check_soda_execute.py
@@ -20,7 +20,7 @@
Run, Check, Log
-def check_soda_execute(run: Run, data_contract: DataContractSpecification, server: Server, spark):
+def check_soda_execute(run: Run, data_contract: DataContractSpecification, server: Server, spark, tmp_dir):
if data_contract is None:
run.log_warn("Cannot run engine soda-core, as data contract is invalid")
return
@@ -67,8 +67,8 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve
scan.set_data_source_name(server.type)
elif server.type == "kafka":
if spark is None:
- spark = create_spark_session()
- read_kafka_topic(spark, data_contract, server)
+ spark = create_spark_session(tmp_dir)
+ read_kafka_topic(spark, data_contract, server, tmp_dir)
scan.add_spark_session(spark, data_source_name=server.type)
scan.set_data_source_name(server.type)
diff --git a/datacontract/engines/soda/connections/kafka.py b/datacontract/engines/soda/connections/kafka.py
index b814da32..e67d5cb2 100644
--- a/datacontract/engines/soda/connections/kafka.py
+++ b/datacontract/engines/soda/connections/kafka.py
@@ -1,3 +1,5 @@
+import os
+
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *
@@ -6,45 +8,95 @@
DataContractSpecification, Server, Field
-def create_spark_session():
- spark = (SparkSession.builder.appName("datacontract")
- .config("spark.streaming.stopGracefullyOnShutdown", True)
- .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
- .getOrCreate())
+def create_spark_session(tmp_dir) -> SparkSession:
+ spark = SparkSession.builder.appName("datacontract") \
+ .config("spark.sql.warehouse.dir", tmp_dir + "/spark-warehouse") \
+ .config("spark.streaming.stopGracefullyOnShutdown", True) \
+ .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0') \
+ .getOrCreate()
print(f'Using PySpark version {spark.version}')
return spark
-def read_kafka_topic(spark, data_contract: DataContractSpecification, server: Server):
+def read_kafka_topic(spark: SparkSession, data_contract: DataContractSpecification, server: Server, tmp_dir):
host = server.host
topic = server.topic
+ auth_options = get_auth_options()
+
df = spark \
- .readStream \
+ .read \
.format("kafka") \
+ .options(**auth_options) \
.option("kafka.bootstrap.servers", host) \
.option("subscribe", topic) \
+ .option("startingOffsets", "earliest") \
.load()
df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- schema = to_struct_type(data_contract)
+ name, model1 = next(iter(data_contract.models.items()))
+ schema = to_struct_type(model1.fields)
# TODO A good warning when the conversion to json fails
df3 = df2.select(from_json(df2.value, schema).alias("json")).select(col("json.*"))
model_name, model = next(iter(data_contract.models.items()))
+ # df3.writeStream.toTable(model_name, checkpointLocation=tmp_dir + "/checkpoint")
df3.createOrReplaceTempView(model_name)
+ print(spark.sql(f"select * from {model_name}").show())
-def to_struct_type(data_contract: DataContractSpecification) -> StructType:
- model_name, model = next(iter(data_contract.models.items()))
+
+def get_auth_options():
+ kafka_sasl_username = os.getenv('DATACONTRACT_KAFKA_SASL_USERNAME')
+ kafka_sasl_password = os.getenv('DATACONTRACT_KAFKA_SASL_PASSWORD')
+ if kafka_sasl_username is None:
+ auth_options = {}
+ else:
+ kafka_sasl_jaas_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_sasl_username}" password="{kafka_sasl_password}";'
+ auth_options = {
+ "kafka.sasl.mechanism": "PLAIN",
+ "kafka.security.protocol": "SASL_SSL",
+ "kafka.sasl.jaas.config": kafka_sasl_jaas_config,
+ }
+ return auth_options
+
+
+def to_struct_type(fields):
struct_fields = []
- for field_name, field in model.fields.items():
+ for field_name, field in fields.items():
struct_fields.append(to_struct_field(field_name, field))
return StructType(struct_fields)
def to_struct_field(field_name: str, field: Field) -> StructField:
- if field.type == "string":
+ if field.type is None:
+ data_type = DataType()
+ if field.type in ["string", "varchar", "text"]:
data_type = StringType()
- elif field.type == "int":
+ elif field.type in ["number", "decimal", "numeric"]:
+ data_type = DecimalType()
+ elif field.type in ["float", "double"]:
+ data_type = DoubleType()
+ elif field.type in ["integer", "int"]:
data_type = IntegerType()
+ elif field.type in ["long", "bigint"]:
+ data_type = LongType()
+ elif field.type in ["boolean"]:
+ data_type = BooleanType()
+ elif field.type in ["timestamp", "timestamp_tz"]:
+ data_type = TimestampType()
+ elif field.type in ["timestamp_ntz"]:
+ data_type = TimestampNTZType()
+ elif field.type in ["date"]:
+ data_type = DateType()
+ elif field.type in ["time"]:
+ data_type = DataType()
+ elif field.type in ["object", "record", "struct"]:
+ data_type = to_struct_type(field.fields)
+ elif field.type in ["binary"]:
+ data_type = BinaryType()
+ elif field.type in ["array"]:
+ # TODO support array structs
+ data_type = ArrayType()
+ elif field.type in ["null"]:
+ data_type = NullType()
else:
data_type = DataType()
diff --git a/tests/examples/kafka-remote/datacontract.yaml b/tests/examples/kafka-remote/datacontract.yaml
new file mode 100644
index 00000000..7e1733e5
--- /dev/null
+++ b/tests/examples/kafka-remote/datacontract.yaml
@@ -0,0 +1,38 @@
+dataContractSpecification: 0.9.2
+id: orders
+info:
+ title: Orders
+ version: 0.0.1
+ description: Order messages as generated by Confluent Datagen Source Adapter
+servers:
+ production:
+ type: kafka
+ topic: datamesh.orders.v1
+ host: pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092
+models:
+ orders:
+ type: table
+ fields:
+ ordertime:
+ type: bigint
+ orderid:
+ type: int
+ itemid:
+ type: string
+ orderunits:
+ type: double
+ address:
+ type: object
+ fields:
+ city:
+ type: string
+ state:
+ type: string
+ zipcode:
+ type: string
+quality:
+ type: SodaCL
+ specification:
+ checks for orders:
+ - row_count >= 5000
+
diff --git a/tests/examples/kafka/datacontract.yaml b/tests/examples/kafka/datacontract.yaml
index 96a20b86..ccb3b288 100644
--- a/tests/examples/kafka/datacontract.yaml
+++ b/tests/examples/kafka/datacontract.yaml
@@ -22,3 +22,8 @@ models:
type: string
sku:
type: string
+quality:
+ type: SodaCL
+ specification:
+ checks for inventory:
+ - row_count >= 10
diff --git a/tests/test_examples_kafka.py b/tests/test_examples_kafka.py
index 9ba31094..b1698896 100644
--- a/tests/test_examples_kafka.py
+++ b/tests/test_examples_kafka.py
@@ -33,7 +33,7 @@ def remove_container():
def test_examples_kafka(kafka_container: KafkaContainer):
- send_messages_to_topic("examples/kafka/data/messages.json", 'json-topic')
+ send_messages_to_topic("examples/kafka/data/messages.json", "inventory-events")
data_contract_str = _setup_datacontract()
data_contract = DataContract(data_contract_str=data_contract_str)
diff --git a/tests/test_examples_kafka_remote.py b/tests/test_examples_kafka_remote.py
new file mode 100644
index 00000000..cd11bbdc
--- /dev/null
+++ b/tests/test_examples_kafka_remote.py
@@ -0,0 +1,33 @@
+import os
+import sys
+
+import pytest
+import six
+
+# Fix for Python 3.12
+if sys.version_info >= (3, 12, 1):
+ sys.modules['kafka.vendor.six.moves'] = six.moves
+
+import logging
+
+from dotenv import load_dotenv
+
+from datacontract.data_contract import DataContract
+
+logging.basicConfig(level=logging.INFO, force=True)
+
+datacontract = "examples/kafka-remote/datacontract.yaml"
+
+
+@pytest.mark.skipif(os.environ.get("DATACONTRACT_KAFKA_SASL_USERNAME") is None,
+ reason="Requires DATACONTRACT_KAFKA_SASL_USERNAME to be set")
+def _test_examples_kafka_remote():
+ load_dotenv(override=True)
+ # os.environ['DATACONTRACT_KAFKA_SASL_USERNAME'] = "xxx"
+ # os.environ['DATACONTRACT_KAFKA_SASL_PASSWORD'] = "xxx"
+ data_contract = DataContract(data_contract_file=datacontract)
+
+ run = data_contract.test()
+
+ print(run)
+ assert run.result == "passed"