Skip to content

Commit

Permalink
Started to develop kafka testing
Browse files Browse the repository at this point in the history
  • Loading branch information
simonharrer committed Feb 29, 2024
1 parent c32c012 commit cb358ac
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dev = [
"pytest",
"testcontainers-minio",
"testcontainers-postgres",
"testcontainers-kafka",
]

[project.urls]
Expand Down
10 changes: 10 additions & 0 deletions tests/examples/kafka/data/messages.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"updated_at":"2022-04-20T13:50:34.228811Z","available":17,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.589142Z","available":16,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.589501Z","available":15,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.589771Z","available":14,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.590008Z","available":13,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.590261Z","available":12,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.590559Z","available":11,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.590831Z","available":12,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.591076Z","available":11,"location":"18","sku":"9521582929054"}
{"updated_at":"2022-04-20T13:50:34.591308Z","available":10,"location":"18","sku":"9521582929054"}
24 changes: 24 additions & 0 deletions tests/examples/kafka/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
dataContractSpecification: 0.9.2
id: inventory-events
info:
title: Inventory Events
version: 0.0.1
servers:
production:
type: kafka
topic: inventory-events
host: __KAFKA_HOST__
dataProductId: inventory
outputPortId: s3
models:
inventory:
type: table
fields:
updated_at:
type: string
available:
type: numeric
location:
type: string
sku:
type: string
63 changes: 63 additions & 0 deletions tests/test_examples_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
import logging
import os

import pytest
from kafka import KafkaProducer
from testcontainers.kafka import KafkaContainer

from datacontract.data_contract import DataContract

logging.basicConfig(level=logging.DEBUG, force=True)

datacontract = "examples/kafka/datacontract.yaml"

kafka = KafkaContainer("confluentinc/cp-kafka:7.6.0")


@pytest.fixture(scope="module", autouse=True)
def kafka_container(request):
kafka.start()

def remove_container():
kafka.stop()

request.addfinalizer(remove_container)

# skip this test
def test_examples_kafka(kafka_container: KafkaContainer):
send_messages_to_topic("examples/kafka/data/messages.json", 'json-topic')

assert True
data_contract_str = _setup_datacontract()
data_contract = DataContract(data_contract_str=data_contract_str)

run = data_contract.test()

print(run)
assert run.result == "warning"
# assert all(check.result == "passed" for check in run.checks)


def send_messages_to_topic(messages_file_path : str, topic_name : str):
print(f"Sending messages from {messages_file_path} to Kafka topic {topic_name}")

producer = KafkaProducer(bootstrap_servers=kafka.get_bootstrap_server(),
value_serializer=lambda m: json.dumps(m).encode('ascii'))
messages_sent = 0

with open(messages_file_path) as messages_file:
for line in messages_file:
message = json.loads(line)
producer.send(topic_name, message)
messages_sent += 1

producer.flush()
print(f"Sent {messages_sent} messages from {messages_file_path} to Kafka topic {topic_name}")


def _setup_datacontract():
with open(datacontract) as data_contract_file:
data_contract_str = data_contract_file.read()
host = kafka.get_bootstrap_server()
return data_contract_str.replace("__KAFKA_HOST__", host)

0 comments on commit cb358ac

Please sign in to comment.