diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fd46fcd..c129ea9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -121,7 +121,27 @@ jobs: ./tests/wait_hserver.sh sleep 5 - - name: Run examples + - name: Run hstream examples run: | python3 -m pip install hstreamdb python3 examples/py/snippets/guides.py + + test-python-kafka-examples: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Start cluster + run: | + docker compose -f ./assets/quick-start-kafka-lite.yaml up -d + sleep 5 + ./tests/wait_hstream_kafka.sh + sleep 5 + + - name: Run hstream kafka examples + run: | + python3 -m pip install kafka-python + python3 kafka-examples/python/snippets/kafka_python.py + + python3 -m pip install confluent-kafka + python3 kafka-examples/python/snippets/confluent_kafka_python.py diff --git a/assets/quick-start-kafka-lite.yaml b/assets/quick-start-kafka-lite.yaml new file mode 100644 index 0000000..9b085c3 --- /dev/null +++ b/assets/quick-start-kafka-lite.yaml @@ -0,0 +1,74 @@ +version: "3.5" + +services: + hserver: + image: hstreamdb/hstream:latest + depends_on: + - zookeeper + - hstore + ports: + - "127.0.0.1:9092:9092" + expose: + - 9092 + networks: + - hstream-quickstart + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - /tmp:/tmp + - data_store:/data/store + command: + - bash + - "-c" + - | + set -e + /usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \ + /usr/local/bin/hstream-server kafka \ + --bind-address 0.0.0.0 --port 9092 \ + --gossip-port 6571 \ + --server-id 100 \ + --seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \ + --advertised-address $$(hostname -I | awk '{print $$1}') \ + --metastore-uri zk://zookeeper:2181 \ + --store-config /data/store/logdevice.conf \ + --store-log-level warning + + hstore: + image: hstreamdb/hstream:latest + networks: + - hstream-quickstart + volumes: + - data_store:/data/store + command: + - bash + - "-c" + - | + set -ex + # N.B. "enable-dscp-reflection=false" is required for linux kernel which + # doesn't support dscp reflection, e.g. centos7. + /usr/local/bin/ld-dev-cluster --root /data/store \ + --use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \ + --user-admin-port 6440 \ + --param enable-dscp-reflection=false \ + --no-interactive + + zookeeper: + image: zookeeper:3.7 + expose: + - 2181 + networks: + - hstream-quickstart + volumes: + - data_zk_data:/data + - data_zk_datalog:/datalog + +networks: + hstream-quickstart: + name: hstream-quickstart + +volumes: + data_store: + name: quickstart_data_store + data_zk_data: + name: quickstart_data_zk_data + data_zk_datalog: + name: quickstart_data_zk_datalog diff --git a/docs/develop-with-kafka-api/_index.md b/docs/develop-with-kafka-api/_index.md index 69b5f99..396af55 100644 --- a/docs/develop-with-kafka-api/_index.md +++ b/docs/develop-with-kafka-api/_index.md @@ -1,5 +1,5 @@ --- -order: ['compatibility.md', 'java.md'] +order: ['compatibility.md', 'java.md', 'python.md'] collapsed: false --- diff --git a/docs/develop-with-kafka-api/compatibility.md b/docs/develop-with-kafka-api/compatibility.md index 735b583..5ecc334 100644 --- a/docs/develop-with-kafka-api/compatibility.md +++ b/docs/develop-with-kafka-api/compatibility.md @@ -2,24 +2,31 @@ ## Overview -HStream also supports Kafka API since 0.19.0, so users can connect to HStream using Kafka clients. HStream implements [Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not need to change any code in your current Kafka applications, just updating the Kafka URLs in your configurations to point to a HStream cluster, and that is it, then you can start streaming from your Kafka applications to a HStream cluster. +HStream also supports Kafka API since 0.19.0, so users can connect to HStream +using Kafka clients. HStream implements +[Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not +need to change any code in your current Kafka applications, just updating the +Kafka URLs in your configurations to point to a HStream cluster, and that is it, +then you can start streaming from your Kafka applications to a HStream cluster. ::: tip -Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to learn how to enable HStream'support of Kafka API. +Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to +learn how to enable HStream'support of Kafka API. ::: ## Compatibility with Apache Kafka -HStream supports Apache Kafka version 0.11 and later, and most Kafka clients should be able to auto-negotiate protocol versions. +HStream supports Apache Kafka version 0.11 and later, and most Kafka clients +should be able to auto-negotiate protocol versions. Currenty, the clients below are tested by HStream. | Language | Kafka Client | | -------- | ----------------------------------------------------------- | | Java | [Apache Kafka Java Client](https://github.com/apache/kafka) | -| Python | [kafka-python](https://github.com/dpkp/kafka-python) | +| Python | [kafka-python], [confluent-kafka-python] | | Go | [franz-go](https://github.com/twmb/franz-go) | | C/C++ | [librdkafka](https://github.com/confluentinc/librdkafka) | @@ -31,13 +38,18 @@ Recommand using the latest version of each Kafka client ## Features not supported in Apache Kafka -HStream do not support below Kafka features now(we plan to support them in the later version): +HStream do not support below Kafka features now(we plan to support them in the +later version): - Kafka transactions - Quotas in Kafka ::: tip -The configuration of Kafka brokers is not applicable to HStream, as HStream is a completely different implementation. +The configuration of Kafka brokers is not applicable to HStream, as HStream is a +completely different implementation. ::: + +[kafka-python]: https://github.com/dpkp/kafka-python +[confluent-kafka-python]: https://github.com/confluentinc/confluent-kafka-python diff --git a/docs/develop-with-kafka-api/python.md b/docs/develop-with-kafka-api/python.md new file mode 100644 index 0000000..439f035 --- /dev/null +++ b/docs/develop-with-kafka-api/python.md @@ -0,0 +1,60 @@ +# Develop with Python Kafka client + +This guide will show you how to use Python Kafka client to interact with +HStream. Currenty, we support [kafka-python] and [confluent-kafka]. + +## Installation + +```sh +# If you want to use kafka-python +pip install kafka-python + +# Or if you want to use confluent-kafka +pip install confluent-kafka +``` + +::: tip + +Prefer to use a virtual environment? Check out Python's built-in +[venv](https://docs.python.org/3/library/venv.html). + +::: + +## Create a Topic + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common create-topic + + +@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common create-topic + +::: + +## Produce Records + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common produce + + +@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common produce + +::: + +## Consume Records + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common consume + + +@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common consume + +::: + +[kafka-python]: https://github.com/dpkp/kafka-python +[confluent-kafka]: https://github.com/confluentinc/confluent-kafka-python diff --git a/docs/develop/_index.md b/docs/develop/_index.md index 28531c5..3a1bccf 100644 --- a/docs/develop/_index.md +++ b/docs/develop/_index.md @@ -3,4 +3,4 @@ order: ['write', 'receive', 'process', 'ingest-and-distribute'] collapsed: false --- -Develope +Develop diff --git a/embed.sh b/embed.sh index 6c41452..1c49ad0 100755 --- a/embed.sh +++ b/embed.sh @@ -4,4 +4,5 @@ if [[ "$EMBED_ME_ARGS" == "" ]]; then ./include_snippets.py --snippets-root . --file-pattern "docs/**/write/*.md" ./include_snippets.py --snippets-root . --file-pattern "docs/**/receive/*.md" + ./include_snippets.py --snippets-root . --file-pattern "docs/**/python.md" fi diff --git a/include_snippets.py b/include_snippets.py index 81a0377..77c9253 100755 --- a/include_snippets.py +++ b/include_snippets.py @@ -2,10 +2,10 @@ import os -FILE_TYPES = {".py": "python", "cpp": "cpp"} +FILE_TYPES = {".py": ("python", "[Python]"), ".cpp": ("cpp", "[C++]")} -def run_snippet_cmd(snippets_root, snippet_file, *labels): +def run_snippet_cmd(snippets_root, snippet_file, *labels, code_group=None): lines = [[] for _ in labels] with open(os.path.join(snippets_root, snippet_file), "r") as f: idx = None @@ -24,12 +24,22 @@ def run_snippet_cmd(snippets_root, snippet_file, *labels): lines[idx].append(line) # TODO: strip indent extension = os.path.splitext(snippet_file)[1] - ft = FILE_TYPES[extension] + (ft, _code_group) = FILE_TYPES[extension] + code_group = _code_group if code_group is None else code_group blocks = "\n\n\n".join("".join(xs).strip() for xs in lines) - return f"```{ft} [Python]\n{blocks}\n```\n" + return f"```{ft} {code_group}\n{blocks}\n```\n" -support_commands = [("@snippet", run_snippet_cmd)] +def run_snippet_cmd_group(snippets_root, snippet_file, group, *labels): + return run_snippet_cmd( + snippets_root, snippet_file, *labels, code_group=group + ) + + +support_commands = [ + ("@snippet", run_snippet_cmd), + ("@snippet_group", run_snippet_cmd_group), +] def parse_commands(lines): diff --git a/kafka-examples/python/snippets/confluent_kafka_python.py b/kafka-examples/python/snippets/confluent_kafka_python.py new file mode 100644 index 0000000..377d038 --- /dev/null +++ b/kafka-examples/python/snippets/confluent_kafka_python.py @@ -0,0 +1,106 @@ +# [common] + +import os +from confluent_kafka import Producer, Consumer +from confluent_kafka.admin import AdminClient, NewTopic + +# NOTE: Replace with your own host and port +host = os.getenv("GUIDE_HOST", "127.0.0.1") +port = os.getenv("GUIDE_PORT", 9092) + +topic_name = "my_topic" +group_id = "confluent_kafka_group" + +conf = { + "bootstrap.servers": host + ":" + str(port), + "client.id": "confluent_kafka_client", +} +# [common] + + +# [create-topic] +def create_topic(): + admin = AdminClient(conf) + new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1) + admin.create_topics([new_topic]) + + +# [create-topic] + + +# [produce] +def produce(): + def acked(err, msg): + if err is not None: + print(f"Failed to deliver message: {msg}: {err}") + else: + print( + f"Message produced: offset={msg.offset()}, " + f'key="{msg.key().decode()}", ' + f'value="{msg.value().decode()}"' + ) + + producer = Producer(conf) + for i in range(5): + producer.produce( + topic_name, + key=b"key " + str(i).encode(), + value=b"hello, hstream " + str(i).encode(), + on_delivery=acked, + ) + producer.flush() + + +# [produce] + + +# [consume] +def consume(): + consumer = Consumer( + { + **conf, + "group.id": group_id, + "auto.offset.reset": "smallest", + "enable.auto.commit": "false", + "isolation.level": "read_uncommitted", + } + ) + consumer.subscribe([topic_name]) + i = 0 + try: + while True: + msg = consumer.poll(1.0) + if msg is None: + # Initial message consumption may take up to + # `session.timeout.ms` for the consumer group to + # rebalance and start consuming + print("Waiting...") + elif msg.error(): + print(f"ERROR: {msg.error()}") + else: + # Extract the (optional) key and value, and print. + print( + f"Consumed topic {msg.topic()}: " + f'key="{msg.key().decode()}", ' + f'value="{msg.value().decode()}"' + ) + i += 1 + if i >= 5: + break + except KeyboardInterrupt: + pass + finally: + consumer.close() + + +# [consume] + + +if __name__ == "__main__": + try: + create_topic() + produce() + consume() + finally: + admin = AdminClient(conf) + admin.delete_topics([topic_name]) diff --git a/kafka-examples/python/snippets/kafka_python.py b/kafka-examples/python/snippets/kafka_python.py new file mode 100644 index 0000000..26ced10 --- /dev/null +++ b/kafka-examples/python/snippets/kafka_python.py @@ -0,0 +1,72 @@ +# [common] + +import os +from kafka.admin import NewTopic +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer + +# NOTE: Replace with your own host and port +host = os.getenv("GUIDE_HOST", "127.0.0.1") +port = os.getenv("GUIDE_PORT", 9092) +addr = host + ":" + str(port) +topic_name = "my_topic" + +# [common] + + +# [create-topic] +def create_topic(): + admin = KafkaAdminClient(bootstrap_servers=addr) + topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1) + admin.create_topics([topic]) + + +# [create-topic] + + +# [produce] +def produce(): + producer = KafkaProducer( + bootstrap_servers=addr, + acks="all", + linger_ms=100, + ) + futures = [ + producer.send(topic_name, b"hello, hstream " + str(i).encode()) + for i in range(5) + ] + for future in futures: + response = future.get(timeout=10) + print("Producer response:", response) + + +# [produce] + + +# [consume] +def consume(): + consumer = KafkaConsumer( + topic_name, + bootstrap_servers=addr, + auto_offset_reset="earliest", + enable_auto_commit=False, + fetch_max_wait_ms=1000, + ) + i = 0 + for msg in consumer: + print("Consumer response", msg) + i += 1 + if i >= 5: + consumer.close() + + +# [consume] + + +if __name__ == "__main__": + try: + create_topic() + produce() + consume() + finally: + admin = KafkaAdminClient(bootstrap_servers=addr) + admin.delete_topics([topic_name]) diff --git a/tests/wait_hstream_kafka.sh b/tests/wait_hstream_kafka.sh new file mode 100755 index 0000000..0ab5475 --- /dev/null +++ b/tests/wait_hstream_kafka.sh @@ -0,0 +1,14 @@ +#!/bin/bash +set -e + +timeout=120 +container_name=wait_hserver + +docker run -td --rm --name $container_name --network host hstreamdb/hstream bash +until (docker exec -t $container_name hstream-kafka-cli --port 9092 node status); do + >&2 echo "Waiting for 127.0.0.1:9092 ..." + sleep 1 + timeout=$((timeout - 1)) + [ $timeout -le 0 ] && echo "Timeout!" && docker kill $container_name && exit 1; +done +docker kill $container_name