Skip to content

Commit

Permalink
Add Kafka support
Browse files Browse the repository at this point in the history
  • Loading branch information
jochenchrist committed Mar 1, 2024
1 parent 0be6395 commit 1273901
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

This is a hugh step forward, we now support testing Kafka messages.
We start with JSON messages, Avro and Protobuf will follow.

### Added
- test kafka for JSON messages
- import sql (#51)
- export to dbt sources: `datacontract export --format dbt-sources`
- export to dbt staging sql: `datacontract export --format dbt-staging-sql`
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ The application uses different engines, based on the server `type`.
| `redshift` | n/a | | Coming soon | TBD |
| `databricks` | n/a | Support for Databricks SQL with Unity catalog and Hive metastore. || soda-core-spark |
| `databricks` | n/a | Support for Spark for programmatic use in Notebooks. || soda-core-spark-df |
| `kafka` | `json` | | Coming soon | TBD |
| `kafka` | `json` | Experimental. | | pyspark<br>soda-core-spark-df |
| `kafka` | `avro` | | Coming soon | TBD |
| `kafka` | `protobuf` | | Coming soon | TBD |
| `local` | `parquet` | || soda-core-duckdb |
Expand Down
1 change: 1 addition & 0 deletions datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def test(self) -> Run:
check_that_datacontract_contains_valid_server_configuration(run, data_contract, self._server)
# TODO check yaml contains models

# TODO create directory only for examples
with tempfile.TemporaryDirectory(prefix="datacontract-cli") as tmp_dir:
if self._examples:
server_name = "examples"
Expand Down
12 changes: 11 additions & 1 deletion datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from datacontract.engines.soda.connections.databricks import \
to_databricks_soda_configuration
from datacontract.engines.soda.connections.duckdb import get_duckdb_connection
from datacontract.engines.soda.connections.kafka import create_spark_session, \
read_kafka_topic
from datacontract.engines.soda.connections.postgres import \
to_postgres_soda_configuration
from datacontract.engines.soda.connections.snowflake import \
Expand Down Expand Up @@ -63,6 +65,13 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve
soda_configuration_str = to_databricks_soda_configuration(server)
scan.add_configuration_yaml_str(soda_configuration_str)
scan.set_data_source_name(server.type)
elif server.type == "kafka":
if spark is None:
spark = create_spark_session()
read_kafka_topic(spark, data_contract, server)
scan.add_spark_session(spark, data_source_name=server.type)
scan.set_data_source_name(server.type)

else:
run.checks.append(Check(
type="general",
Expand Down Expand Up @@ -92,7 +101,8 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve
for c in scan_results.get("checks"):
check = Check(
type="schema",
result="passed" if c.get("outcome") == "pass" else "failed" if c.get("outcome") == "fail" else c.get("outcome"),
result="passed" if c.get("outcome") == "pass" else "failed" if c.get("outcome") == "fail" else c.get(
"outcome"),
reason=', '.join(c.get("outcomeReasons")),
name=c.get("name"),
model=c.get("table"),
Expand Down
51 changes: 51 additions & 0 deletions datacontract/engines/soda/connections/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

from datacontract.model.data_contract_specification import \
DataContractSpecification, Server, Field


def create_spark_session():
spark = (SparkSession.builder.appName("datacontract")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
.getOrCreate())
print(f'Using PySpark version {spark.version}')
return spark


def read_kafka_topic(spark, data_contract: DataContractSpecification, server: Server):
host = server.host
topic = server.topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", host) \
.option("subscribe", topic) \
.load()
df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
schema = to_struct_type(data_contract)
# TODO A good warning when the conversion to json fails
df3 = df2.select(from_json(df2.value, schema).alias("json")).select(col("json.*"))
model_name, model = next(iter(data_contract.models.items()))
df3.createOrReplaceTempView(model_name)


def to_struct_type(data_contract: DataContractSpecification) -> StructType:
model_name, model = next(iter(data_contract.models.items()))
struct_fields = []
for field_name, field in model.fields.items():
struct_fields.append(to_struct_field(field_name, field))
return StructType(struct_fields)


def to_struct_field(field_name: str, field: Field) -> StructField:
if field.type == "string":
data_type = StringType()
elif field.type == "int":
data_type = IntegerType()
else:
data_type = DataType()

return StructField(field_name, data_type, nullable=not field.required)
1 change: 1 addition & 0 deletions datacontract/model/data_contract_specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Server(BaseModel):
host: str = None
port: int = None
catalog: str = None
topic: str = None
http_path: str = None # Use ENV variable
token: str = None # Use ENV variable
dataProductId: str = None
Expand Down
2 changes: 1 addition & 1 deletion tests/examples/kafka/datacontract.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ models:
updated_at:
type: string
available:
type: numeric
type: int
location:
type: string
sku:
Expand Down
13 changes: 5 additions & 8 deletions tests/test_examples_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import six

# Fix for Python 3.12
if sys.version_info >= (3, 12, 1):
sys.modules['kafka.vendor.six.moves'] = six.moves


import json
import logging

Expand All @@ -15,7 +15,7 @@

from datacontract.data_contract import DataContract

logging.basicConfig(level=logging.DEBUG, force=True)
logging.basicConfig(level=logging.INFO, force=True)

datacontract = "examples/kafka/datacontract.yaml"

Expand All @@ -31,22 +31,19 @@ def remove_container():

request.addfinalizer(remove_container)

# skip this test

def test_examples_kafka(kafka_container: KafkaContainer):
send_messages_to_topic("examples/kafka/data/messages.json", 'json-topic')

assert True
data_contract_str = _setup_datacontract()
data_contract = DataContract(data_contract_str=data_contract_str)

run = data_contract.test()

print(run)
assert run.result == "warning"
# assert all(check.result == "passed" for check in run.checks)
assert run.result == "passed"


def send_messages_to_topic(messages_file_path : str, topic_name : str):
def send_messages_to_topic(messages_file_path: str, topic_name: str):
print(f"Sending messages from {messages_file_path} to Kafka topic {topic_name}")

producer = KafkaProducer(bootstrap_servers=kafka.get_bootstrap_server(),
Expand Down

0 comments on commit 1273901

Please sign in to comment.