Skip to content

Commit

Permalink
Improve support for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
jochenchrist committed Mar 2, 2024
1 parent 11349b5 commit 69f21e0
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 20 deletions.
28 changes: 26 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<a href="https://datacontract.com/slack" rel="nofollow"><img src="https://camo.githubusercontent.com/5ade1fd1e76a6ab860802cdd2941fe2501e2ca2cb534e5d8968dbf864c13d33d/68747470733a2f2f696d672e736869656c64732e696f2f62616467652f736c61636b2d6a6f696e5f636861742d77686974652e7376673f6c6f676f3d736c61636b267374796c653d736f6369616c" alt="Slack Status" data-canonical-src="https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&amp;style=social" style="max-width: 100%;"></a>
</p>

The `datacontract` CLI is an open source command-line tool for working with [Data Contracts](https://datacontract.com/).
**The `datacontract` CLI is an open source command-line tool for working with [Data Contracts](https://datacontract.com/).
It uses data contract YAML files to lint the data contract, connect to data sources and execute schema and quality tests, detect breaking changes, and export to different formats. The tool is written in Python. It can be used as a standalone CLI tool, in a CI/CD pipeline, or directly as a Python library.


Expand Down Expand Up @@ -343,6 +343,30 @@ run = data_contract.test()
run.result
```

### Kafka

Kafka support is currently considered experimental.

#### Example

datacontract.yaml
```yaml
servers:
production:
type: kafka
host: abc-12345.eu-central-1.aws.confluent.cloud:9092
topic: my-topic-name
format: json
```

#### Environment Variables

| Environment Variable | Example | Description |
|------------------------------------|---------|-----------------------------|
| `DATACONTRACT_KAFKA_SASL_USERNAME` | `xxx` | The SASL username (key). |
| `DATACONTRACT_KAFKA_SASL_PASSWORD` | `xxx` | The SASL password (secret). |



### Exports

Expand Down Expand Up @@ -460,4 +484,4 @@ Created by [Stefan Negele](https://www.linkedin.com/in/stefan-negele-573153112/)



<a href="https://github.com/datacontract/cli" class="github-corner" aria-label="View source on GitHub"><svg width="80" height="80" viewBox="0 0 250 250" style="fill:#151513; color:#fff; position: absolute; top: 0; border: 0; right: 0;" aria-hidden="true"><path d="M0,0 L115,115 L130,115 L142,142 L250,250 L250,0 Z"></path><path d="M128.3,109.0 C113.8,99.7 119.0,89.6 119.0,89.6 C122.0,82.7 120.5,78.6 120.5,78.6 C119.2,72.0 123.4,76.3 123.4,76.3 C127.3,80.9 125.5,87.3 125.5,87.3 C122.9,97.6 130.6,101.9 134.4,103.2" fill="currentColor" style="transform-origin: 130px 106px;" class="octo-arm"></path><path d="M115.0,115.0 C114.9,115.1 118.7,116.5 119.8,115.4 L133.7,101.6 C136.9,99.2 139.9,98.4 142.2,98.6 C133.8,88.0 127.5,74.4 143.8,58.0 C148.5,53.4 154.0,51.2 159.7,51.0 C160.3,49.4 163.2,43.6 171.4,40.1 C171.4,40.1 176.1,42.5 178.8,56.2 C183.1,58.6 187.2,61.8 190.9,65.4 C194.5,69.0 197.7,73.2 200.1,77.6 C213.8,80.2 216.3,84.9 216.3,84.9 C212.7,93.1 206.9,96.0 205.4,96.6 C205.1,102.4 203.0,107.8 198.3,112.5 C181.9,128.9 168.3,122.5 157.7,114.1 C157.9,116.9 156.7,120.9 152.7,124.9 L141.0,136.5 C139.8,137.7 141.6,141.9 141.8,141.8 Z" fill="currentColor" class="octo-body"></path></svg></a><style>.github-corner:hover .octo-arm{animation:octocat-wave 560ms ease-in-out}@keyframes octocat-wave{0%,100%{transform:rotate(0)}20%,60%{transform:rotate(-25deg)}40%,80%{transform:rotate(10deg)}}@media (max-width:500px){.github-corner:hover .octo-arm{animation:none}.github-corner .octo-arm{animation:octocat-wave 560ms ease-in-out}}</style>
<a href="https://github.com/datacontract/cli" class="github-corner" aria-label="View source on GitHub"><svg width="80" height="80" viewBox="0 0 250 250" style="fill:#151513; color:#fff; position: absolute; top: 0; border: 0; right: 0;" aria-hidden="true"><path d="M0,0 L115,115 L130,115 L142,142 L250,250 L250,0 Z"></path><path d="M128.3,109.0 C113.8,99.7 119.0,89.6 119.0,89.6 C122.0,82.7 120.5,78.6 120.5,78.6 C119.2,72.0 123.4,76.3 123.4,76.3 C127.3,80.9 125.5,87.3 125.5,87.3 C122.9,97.6 130.6,101.9 134.4,103.2" fill="currentColor" style="transform-origin: 130px 106px;" class="octo-arm"></path><path d="M115.0,115.0 C114.9,115.1 118.7,116.5 119.8,115.4 L133.7,101.6 C136.9,99.2 139.9,98.4 142.2,98.6 C133.8,88.0 127.5,74.4 143.8,58.0 C148.5,53.4 154.0,51.2 159.7,51.0 C160.3,49.4 163.2,43.6 171.4,40.1 C171.4,40.1 176.1,42.5 178.8,56.2 C183.1,58.6 187.2,61.8 190.9,65.4 C194.5,69.0 197.7,73.2 200.1,77.6 C213.8,80.2 216.3,84.9 216.3,84.9 C212.7,93.1 206.9,96.0 205.4,96.6 C205.1,102.4 203.0,107.8 198.3,112.5 C181.9,128.9 168.3,122.5 157.7,114.1 C157.9,116.9 156.7,120.9 152.7,124.9 L141.0,136.5 C139.8,137.7 141.6,141.9 141.8,141.8 Z" fill="currentColor" class="octo-body"></path></svg></a><style>.github-corner:hover .octo-arm{animation:octocat-wave 560ms ease-in-out}@keyframes octocat-wave{0%,100%{transform:rotate(0)}20%,60%{transform:rotate(-25deg)}40%,80%{transform:rotate(10deg)}}@media (max-width:500px){.github-corner:hover .octo-arm{animation:none}.github-corner .octo-arm{animation:octocat-wave 560ms ease-in-out}}</style>**
2 changes: 1 addition & 1 deletion datacontract/data_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test(self) -> Run:
# 6. check server credentials are complete
if server.format == "json":
check_jsonschema(run, data_contract, server)
check_soda_execute(run, data_contract, server, self._spark)
check_soda_execute(run, data_contract, server, self._spark, tmp_dir)

except DataContractException as e:
run.checks.append(Check(
Expand Down
6 changes: 3 additions & 3 deletions datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Run, Check, Log


def check_soda_execute(run: Run, data_contract: DataContractSpecification, server: Server, spark):
def check_soda_execute(run: Run, data_contract: DataContractSpecification, server: Server, spark, tmp_dir):
if data_contract is None:
run.log_warn("Cannot run engine soda-core, as data contract is invalid")
return
Expand Down Expand Up @@ -67,8 +67,8 @@ def check_soda_execute(run: Run, data_contract: DataContractSpecification, serve
scan.set_data_source_name(server.type)
elif server.type == "kafka":
if spark is None:
spark = create_spark_session()
read_kafka_topic(spark, data_contract, server)
spark = create_spark_session(tmp_dir)
read_kafka_topic(spark, data_contract, server, tmp_dir)
scan.add_spark_session(spark, data_source_name=server.type)
scan.set_data_source_name(server.type)

Expand Down
78 changes: 65 additions & 13 deletions datacontract/engines/soda/connections/kafka.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *
Expand All @@ -6,45 +8,95 @@
DataContractSpecification, Server, Field


def create_spark_session():
spark = (SparkSession.builder.appName("datacontract")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
.getOrCreate())
def create_spark_session(tmp_dir) -> SparkSession:
spark = SparkSession.builder.appName("datacontract") \
.config("spark.sql.warehouse.dir", tmp_dir + "/spark-warehouse") \
.config("spark.streaming.stopGracefullyOnShutdown", True) \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0') \
.getOrCreate()
print(f'Using PySpark version {spark.version}')
return spark


def read_kafka_topic(spark, data_contract: DataContractSpecification, server: Server):
def read_kafka_topic(spark: SparkSession, data_contract: DataContractSpecification, server: Server, tmp_dir):
host = server.host
topic = server.topic
auth_options = get_auth_options()

df = spark \
.readStream \
.read \
.format("kafka") \
.options(**auth_options) \
.option("kafka.bootstrap.servers", host) \
.option("subscribe", topic) \
.option("startingOffsets", "earliest") \
.load()
df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
schema = to_struct_type(data_contract)
name, model1 = next(iter(data_contract.models.items()))
schema = to_struct_type(model1.fields)
# TODO A good warning when the conversion to json fails
df3 = df2.select(from_json(df2.value, schema).alias("json")).select(col("json.*"))
model_name, model = next(iter(data_contract.models.items()))
# df3.writeStream.toTable(model_name, checkpointLocation=tmp_dir + "/checkpoint")
df3.createOrReplaceTempView(model_name)

print(spark.sql(f"select * from {model_name}").show())

def to_struct_type(data_contract: DataContractSpecification) -> StructType:
model_name, model = next(iter(data_contract.models.items()))

def get_auth_options():
kafka_sasl_username = os.getenv('DATACONTRACT_KAFKA_SASL_USERNAME')
kafka_sasl_password = os.getenv('DATACONTRACT_KAFKA_SASL_PASSWORD')
if kafka_sasl_username is None:
auth_options = {}
else:
kafka_sasl_jaas_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_sasl_username}" password="{kafka_sasl_password}";'
auth_options = {
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": kafka_sasl_jaas_config,
}
return auth_options


def to_struct_type(fields):
struct_fields = []
for field_name, field in model.fields.items():
for field_name, field in fields.items():
struct_fields.append(to_struct_field(field_name, field))
return StructType(struct_fields)


def to_struct_field(field_name: str, field: Field) -> StructField:
if field.type == "string":
if field.type is None:
data_type = DataType()
if field.type in ["string", "varchar", "text"]:
data_type = StringType()
elif field.type == "int":
elif field.type in ["number", "decimal", "numeric"]:
data_type = DecimalType()
elif field.type in ["float", "double"]:
data_type = DoubleType()
elif field.type in ["integer", "int"]:
data_type = IntegerType()
elif field.type in ["long", "bigint"]:
data_type = LongType()
elif field.type in ["boolean"]:
data_type = BooleanType()
elif field.type in ["timestamp", "timestamp_tz"]:
data_type = TimestampType()
elif field.type in ["timestamp_ntz"]:
data_type = TimestampNTZType()
elif field.type in ["date"]:
data_type = DateType()
elif field.type in ["time"]:
data_type = DataType()
elif field.type in ["object", "record", "struct"]:
data_type = to_struct_type(field.fields)
elif field.type in ["binary"]:
data_type = BinaryType()
elif field.type in ["array"]:
# TODO support array structs
data_type = ArrayType()
elif field.type in ["null"]:
data_type = NullType()
else:
data_type = DataType()

Expand Down
38 changes: 38 additions & 0 deletions tests/examples/kafka-remote/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
dataContractSpecification: 0.9.2
id: orders
info:
title: Orders
version: 0.0.1
description: Order messages as generated by Confluent Datagen Source Adapter
servers:
production:
type: kafka
topic: datamesh.orders.v1
host: pkc-7xoy1.eu-central-1.aws.confluent.cloud:9092
models:
orders:
type: table
fields:
ordertime:
type: bigint
orderid:
type: int
itemid:
type: string
orderunits:
type: double
address:
type: object
fields:
city:
type: string
state:
type: string
zipcode:
type: string
quality:
type: SodaCL
specification:
checks for orders:
- row_count >= 5000

5 changes: 5 additions & 0 deletions tests/examples/kafka/datacontract.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@ models:
type: string
sku:
type: string
quality:
type: SodaCL
specification:
checks for inventory:
- row_count >= 10
2 changes: 1 addition & 1 deletion tests/test_examples_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def remove_container():


def test_examples_kafka(kafka_container: KafkaContainer):
send_messages_to_topic("examples/kafka/data/messages.json", 'json-topic')
send_messages_to_topic("examples/kafka/data/messages.json", "inventory-events")
data_contract_str = _setup_datacontract()
data_contract = DataContract(data_contract_str=data_contract_str)

Expand Down
33 changes: 33 additions & 0 deletions tests/test_examples_kafka_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import sys

import pytest
import six

# Fix for Python 3.12
if sys.version_info >= (3, 12, 1):
sys.modules['kafka.vendor.six.moves'] = six.moves

import logging

from dotenv import load_dotenv

from datacontract.data_contract import DataContract

logging.basicConfig(level=logging.INFO, force=True)

datacontract = "examples/kafka-remote/datacontract.yaml"


@pytest.mark.skipif(os.environ.get("DATACONTRACT_KAFKA_SASL_USERNAME") is None,
reason="Requires DATACONTRACT_KAFKA_SASL_USERNAME to be set")
def _test_examples_kafka_remote():
load_dotenv(override=True)
# os.environ['DATACONTRACT_KAFKA_SASL_USERNAME'] = "xxx"
# os.environ['DATACONTRACT_KAFKA_SASL_PASSWORD'] = "xxx"
data_contract = DataContract(data_contract_file=datacontract)

run = data_contract.test()

print(run)
assert run.result == "passed"

0 comments on commit 69f21e0

Please sign in to comment.