Skip to content

Commit

Permalink
Add initial kafka blackbox tests (#1651)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Oct 20, 2023
1 parent 07e2236 commit 9b77ee4
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 0 deletions.
26 changes: 26 additions & 0 deletions hstream-kafka/tests/blackbox/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import socket
import time
from kafka.conn import BrokerConnection


# Also: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html#kafka.KafkaClient.send
def send_req(port, req, resp_count=1):
conn = BrokerConnection("localhost", port, socket.AF_UNSPEC)
conn.connect_blocking()
assert conn.connected()

assert len(conn.in_flight_requests) == 0
conn.send(req, blocking=True)
assert len(conn.in_flight_requests) == 1

resps = []
while len(resps) < resp_count:
time.sleep(0.2)
results = conn.recv()
_resps = [r[0] for r in results]
resps = [*resps, *_resps]

if resp_count == 1:
return resps[0]
else:
return resps
22 changes: 22 additions & 0 deletions hstream-kafka/tests/blackbox/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest
from kafka import KafkaAdminClient


@pytest.fixture
def kafka_port():
return 19092


@pytest.fixture
def hstream_kafka_port():
return 19093


@pytest.fixture
def kafka_admin_client(kafka_port):
return KafkaAdminClient(bootstrap_servers=f"localhost:{kafka_port}")


@pytest.fixture
def hstream_kafka_admin_client(hstream_kafka_port):
return KafkaAdminClient(bootstrap_servers=f"localhost:{hstream_kafka_port}")
26 changes: 26 additions & 0 deletions hstream-kafka/tests/blackbox/env/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# $ HSTREAM_IMAGE_NAME=your-image docker compose up ...
version: "2"
name: hstream-kafka-blackbox-tests

services:
kafka:
image: docker.io/bitnami/kafka:3.5
ports:
- "19092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:19092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

volumes:
kafka_data:
driver: local
3 changes: 3 additions & 0 deletions hstream-kafka/tests/blackbox/env/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pytest==7.4.2
kafka-python==2.0.2
confluent-kafka==2.2.0
69 changes: 69 additions & 0 deletions hstream-kafka/tests/blackbox/test_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest

import kafka
from kafka import KafkaConsumer
from kafka.admin import NewTopic
from kafka.structs import TopicPartition
from kafka.protocol.fetch import FetchRequest

from common import send_req

topic_name = "blackbox_test_topic"


def force_create_topic(admin, topic_name):
try:
admin.delete_topics([topic_name])
except kafka.errors.UnknownTopicOrPartitionError:
pass
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
admin.create_topics([topic])


@pytest.fixture
def new_topic(
kafka_admin_client, hstream_kafka_admin_client, topic_name=topic_name
):
force_create_topic(kafka_admin_client, topic_name)
force_create_topic(hstream_kafka_admin_client, topic_name)
yield
kafka_admin_client.delete_topics([topic_name])
hstream_kafka_admin_client.delete_topics([topic_name])


def test_fetch_empty_topics(kafka_port, hstream_kafka_port):
req = FetchRequest[2](
-1, # replica_id
2000, # fetch_max_wait_ms
0, # fetch_min_bytes
[],
)
kafka_resp = send_req(kafka_port, req)
hstream_kafka_resp = send_req(hstream_kafka_port, req)
assert kafka_resp == hstream_kafka_resp


def test_offsets_of_empty_topic(new_topic, kafka_port, hstream_kafka_port):
kafka_consumer = KafkaConsumer(
topic_name,
bootstrap_servers=f"localhost:{kafka_port}",
auto_offset_reset="earliest",
enable_auto_commit=False,
fetch_max_wait_ms=1000,
api_version=(0, 10),
)
hstream_kafka_consumer = KafkaConsumer(
topic_name,
bootstrap_servers=f"localhost:{hstream_kafka_port}",
auto_offset_reset="earliest",
enable_auto_commit=False,
fetch_max_wait_ms=1000,
api_version=(0, 10),
)
partition = TopicPartition(topic_name, 0)
assert kafka_consumer.beginning_offsets(
[partition]
) == hstream_kafka_consumer.beginning_offsets([partition])
assert kafka_consumer.end_offsets(
[partition]
) == hstream_kafka_consumer.end_offsets([partition])

0 comments on commit 9b77ee4

Please sign in to comment.