From 9b77ee4014df44f7f0fad8b58a34c9bc35ed8569 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Fri, 20 Oct 2023 10:02:56 +0800 Subject: [PATCH] Add initial kafka blackbox tests (#1651) --- hstream-kafka/tests/blackbox/common.py | 26 +++++++ hstream-kafka/tests/blackbox/conftest.py | 22 ++++++ .../tests/blackbox/env/docker-compose.yaml | 26 +++++++ .../tests/blackbox/env/requirements.txt | 3 + hstream-kafka/tests/blackbox/test_consumer.py | 69 +++++++++++++++++++ 5 files changed, 146 insertions(+) create mode 100644 hstream-kafka/tests/blackbox/common.py create mode 100644 hstream-kafka/tests/blackbox/conftest.py create mode 100644 hstream-kafka/tests/blackbox/env/docker-compose.yaml create mode 100644 hstream-kafka/tests/blackbox/env/requirements.txt create mode 100644 hstream-kafka/tests/blackbox/test_consumer.py diff --git a/hstream-kafka/tests/blackbox/common.py b/hstream-kafka/tests/blackbox/common.py new file mode 100644 index 000000000..b0cff7730 --- /dev/null +++ b/hstream-kafka/tests/blackbox/common.py @@ -0,0 +1,26 @@ +import socket +import time +from kafka.conn import BrokerConnection + + +# Also: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html#kafka.KafkaClient.send +def send_req(port, req, resp_count=1): + conn = BrokerConnection("localhost", port, socket.AF_UNSPEC) + conn.connect_blocking() + assert conn.connected() + + assert len(conn.in_flight_requests) == 0 + conn.send(req, blocking=True) + assert len(conn.in_flight_requests) == 1 + + resps = [] + while len(resps) < resp_count: + time.sleep(0.2) + results = conn.recv() + _resps = [r[0] for r in results] + resps = [*resps, *_resps] + + if resp_count == 1: + return resps[0] + else: + return resps diff --git a/hstream-kafka/tests/blackbox/conftest.py b/hstream-kafka/tests/blackbox/conftest.py new file mode 100644 index 000000000..f62b08769 --- /dev/null +++ b/hstream-kafka/tests/blackbox/conftest.py @@ -0,0 +1,22 @@ +import pytest +from kafka import KafkaAdminClient + + +@pytest.fixture +def kafka_port(): + return 19092 + + +@pytest.fixture +def hstream_kafka_port(): + return 19093 + + +@pytest.fixture +def kafka_admin_client(kafka_port): + return KafkaAdminClient(bootstrap_servers=f"localhost:{kafka_port}") + + +@pytest.fixture +def hstream_kafka_admin_client(hstream_kafka_port): + return KafkaAdminClient(bootstrap_servers=f"localhost:{hstream_kafka_port}") diff --git a/hstream-kafka/tests/blackbox/env/docker-compose.yaml b/hstream-kafka/tests/blackbox/env/docker-compose.yaml new file mode 100644 index 000000000..792e2fe38 --- /dev/null +++ b/hstream-kafka/tests/blackbox/env/docker-compose.yaml @@ -0,0 +1,26 @@ +# $ HSTREAM_IMAGE_NAME=your-image docker compose up ... +version: "2" +name: hstream-kafka-blackbox-tests + +services: + kafka: + image: docker.io/bitnami/kafka:3.5 + ports: + - "19092:9092" + volumes: + - "kafka_data:/bitnami" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:19092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + +volumes: + kafka_data: + driver: local diff --git a/hstream-kafka/tests/blackbox/env/requirements.txt b/hstream-kafka/tests/blackbox/env/requirements.txt new file mode 100644 index 000000000..811850912 --- /dev/null +++ b/hstream-kafka/tests/blackbox/env/requirements.txt @@ -0,0 +1,3 @@ +pytest==7.4.2 +kafka-python==2.0.2 +confluent-kafka==2.2.0 diff --git a/hstream-kafka/tests/blackbox/test_consumer.py b/hstream-kafka/tests/blackbox/test_consumer.py new file mode 100644 index 000000000..6e717bd50 --- /dev/null +++ b/hstream-kafka/tests/blackbox/test_consumer.py @@ -0,0 +1,69 @@ +import pytest + +import kafka +from kafka import KafkaConsumer +from kafka.admin import NewTopic +from kafka.structs import TopicPartition +from kafka.protocol.fetch import FetchRequest + +from common import send_req + +topic_name = "blackbox_test_topic" + + +def force_create_topic(admin, topic_name): + try: + admin.delete_topics([topic_name]) + except kafka.errors.UnknownTopicOrPartitionError: + pass + topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1) + admin.create_topics([topic]) + + +@pytest.fixture +def new_topic( + kafka_admin_client, hstream_kafka_admin_client, topic_name=topic_name +): + force_create_topic(kafka_admin_client, topic_name) + force_create_topic(hstream_kafka_admin_client, topic_name) + yield + kafka_admin_client.delete_topics([topic_name]) + hstream_kafka_admin_client.delete_topics([topic_name]) + + +def test_fetch_empty_topics(kafka_port, hstream_kafka_port): + req = FetchRequest[2]( + -1, # replica_id + 2000, # fetch_max_wait_ms + 0, # fetch_min_bytes + [], + ) + kafka_resp = send_req(kafka_port, req) + hstream_kafka_resp = send_req(hstream_kafka_port, req) + assert kafka_resp == hstream_kafka_resp + + +def test_offsets_of_empty_topic(new_topic, kafka_port, hstream_kafka_port): + kafka_consumer = KafkaConsumer( + topic_name, + bootstrap_servers=f"localhost:{kafka_port}", + auto_offset_reset="earliest", + enable_auto_commit=False, + fetch_max_wait_ms=1000, + api_version=(0, 10), + ) + hstream_kafka_consumer = KafkaConsumer( + topic_name, + bootstrap_servers=f"localhost:{hstream_kafka_port}", + auto_offset_reset="earliest", + enable_auto_commit=False, + fetch_max_wait_ms=1000, + api_version=(0, 10), + ) + partition = TopicPartition(topic_name, 0) + assert kafka_consumer.beginning_offsets( + [partition] + ) == hstream_kafka_consumer.beginning_offsets([partition]) + assert kafka_consumer.end_offsets( + [partition] + ) == hstream_kafka_consumer.end_offsets([partition])