From 913018e5bcf5e7e9ee5fcea890121e3dfe56bde1 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Tue, 9 Jan 2024 14:52:48 +0800 Subject: [PATCH] add python examples for hstream kafka --- docs/develop-with-kafka-api/_index.md | 2 +- docs/develop-with-kafka-api/compatibility.md | 24 +++++-- docs/develop-with-kafka-api/python.md | 57 ++++++++++++++++ docs/develop/_index.md | 2 +- examples/py/snippets/guides.py | 27 +++++++- include_snippets.py | 20 ++++-- .../python/snippets/confluent_kafka_python.py | 0 .../python/snippets/kafka_python.py | 67 +++++++++++++++++++ 8 files changed, 183 insertions(+), 16 deletions(-) create mode 100644 docs/develop-with-kafka-api/python.md create mode 100644 kafka-examples/python/snippets/confluent_kafka_python.py create mode 100644 kafka-examples/python/snippets/kafka_python.py diff --git a/docs/develop-with-kafka-api/_index.md b/docs/develop-with-kafka-api/_index.md index 69b5f99..396af55 100644 --- a/docs/develop-with-kafka-api/_index.md +++ b/docs/develop-with-kafka-api/_index.md @@ -1,5 +1,5 @@ --- -order: ['compatibility.md', 'java.md'] +order: ['compatibility.md', 'java.md', 'python.md'] collapsed: false --- diff --git a/docs/develop-with-kafka-api/compatibility.md b/docs/develop-with-kafka-api/compatibility.md index 735b583..5ecc334 100644 --- a/docs/develop-with-kafka-api/compatibility.md +++ b/docs/develop-with-kafka-api/compatibility.md @@ -2,24 +2,31 @@ ## Overview -HStream also supports Kafka API since 0.19.0, so users can connect to HStream using Kafka clients. HStream implements [Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not need to change any code in your current Kafka applications, just updating the Kafka URLs in your configurations to point to a HStream cluster, and that is it, then you can start streaming from your Kafka applications to a HStream cluster. +HStream also supports Kafka API since 0.19.0, so users can connect to HStream +using Kafka clients. HStream implements +[Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not +need to change any code in your current Kafka applications, just updating the +Kafka URLs in your configurations to point to a HStream cluster, and that is it, +then you can start streaming from your Kafka applications to a HStream cluster. ::: tip -Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to learn how to enable HStream'support of Kafka API. +Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to +learn how to enable HStream'support of Kafka API. ::: ## Compatibility with Apache Kafka -HStream supports Apache Kafka version 0.11 and later, and most Kafka clients should be able to auto-negotiate protocol versions. +HStream supports Apache Kafka version 0.11 and later, and most Kafka clients +should be able to auto-negotiate protocol versions. Currenty, the clients below are tested by HStream. | Language | Kafka Client | | -------- | ----------------------------------------------------------- | | Java | [Apache Kafka Java Client](https://github.com/apache/kafka) | -| Python | [kafka-python](https://github.com/dpkp/kafka-python) | +| Python | [kafka-python], [confluent-kafka-python] | | Go | [franz-go](https://github.com/twmb/franz-go) | | C/C++ | [librdkafka](https://github.com/confluentinc/librdkafka) | @@ -31,13 +38,18 @@ Recommand using the latest version of each Kafka client ## Features not supported in Apache Kafka -HStream do not support below Kafka features now(we plan to support them in the later version): +HStream do not support below Kafka features now(we plan to support them in the +later version): - Kafka transactions - Quotas in Kafka ::: tip -The configuration of Kafka brokers is not applicable to HStream, as HStream is a completely different implementation. +The configuration of Kafka brokers is not applicable to HStream, as HStream is a +completely different implementation. ::: + +[kafka-python]: https://github.com/dpkp/kafka-python +[confluent-kafka-python]: https://github.com/confluentinc/confluent-kafka-python diff --git a/docs/develop-with-kafka-api/python.md b/docs/develop-with-kafka-api/python.md new file mode 100644 index 0000000..e201d5c --- /dev/null +++ b/docs/develop-with-kafka-api/python.md @@ -0,0 +1,57 @@ +# Develop with Python Kafka client + +This guide will show you how to use Python Kafka client to interact with +HStream. Currenty, we support [kafka-python] and [confluent-kafka]. + +::: info + +The following examples assume that you have Python3.7+ installed. + +::: + +## Installation + +```sh +# If you want to use kafka-python +pip install kafka-python + +# Or if you want to use confluent-kafka +pip install confluent-kafka +``` + +::: tips + +Prefer to use a virtual environment? Check out Python's built-in +[venv](https://docs.python.org/3/library/venv.html). + +::: + +## Create a Topic + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common create-topic + +::: + +## Kafka Producer + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common produce + +::: + +## Kafka Consumer + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common consume + +::: + +[kafka-python]: https://github.com/dpkp/kafka-python +[confluent-kafka]: https://github.com/confluentinc/confluent-kafka-python diff --git a/docs/develop/_index.md b/docs/develop/_index.md index 28531c5..3a1bccf 100644 --- a/docs/develop/_index.md +++ b/docs/develop/_index.md @@ -3,4 +3,4 @@ order: ['write', 'receive', 'process', 'ingest-and-distribute'] collapsed: false --- -Develope +Develop diff --git a/examples/py/snippets/guides.py b/examples/py/snippets/guides.py index 0847aee..605de4e 100644 --- a/examples/py/snippets/guides.py +++ b/examples/py/snippets/guides.py @@ -108,7 +108,9 @@ class AppendCallback(hstreamdb.BufferedProducer.AppendCallback): def on_success(self, stream_name, payloads, stream_keyid): self.count += 1 - print(f"Batch {self.count}: Append success with {len(payloads)} payloads.") + print( + f"Batch {self.count}: Append success with {len(payloads)} payloads." + ) def on_fail(self, stream_name, payloads, stream_keyid, e): print("Append failed!") @@ -179,6 +181,25 @@ async def read_reader(client): if __name__ == "__main__": + import functools + + # TODO: the client should be able to retry automatically if server is + # UNAVAILABLE (e.g. NOSEQUENCER) + def retry(async_function, max_retries=10, delay=1): + @functools.wraps(async_function) + async def wrapper(client): + for attempt in range(1, max_retries + 1): + try: + result = await async_function(client) + return result # If successful, return the result + except Exception as e: + print(f"Attempt {attempt} failed: {e}") + await asyncio.sleep(delay) # Wait before the next attempt + + raise ValueError(f"Function failed after {max_retries} attempts") + + return wrapper + def safe_run(fun, *args): try: fun(*args) @@ -190,8 +211,8 @@ def safe_run(fun, *args): main( create_stream, list_streams, - append_records, - buffered_append_records, + retry(append_records), + retry(buffered_append_records), create_subscription, list_subscriptions, subscribe_records, diff --git a/include_snippets.py b/include_snippets.py index 81a0377..77c9253 100755 --- a/include_snippets.py +++ b/include_snippets.py @@ -2,10 +2,10 @@ import os -FILE_TYPES = {".py": "python", "cpp": "cpp"} +FILE_TYPES = {".py": ("python", "[Python]"), ".cpp": ("cpp", "[C++]")} -def run_snippet_cmd(snippets_root, snippet_file, *labels): +def run_snippet_cmd(snippets_root, snippet_file, *labels, code_group=None): lines = [[] for _ in labels] with open(os.path.join(snippets_root, snippet_file), "r") as f: idx = None @@ -24,12 +24,22 @@ def run_snippet_cmd(snippets_root, snippet_file, *labels): lines[idx].append(line) # TODO: strip indent extension = os.path.splitext(snippet_file)[1] - ft = FILE_TYPES[extension] + (ft, _code_group) = FILE_TYPES[extension] + code_group = _code_group if code_group is None else code_group blocks = "\n\n\n".join("".join(xs).strip() for xs in lines) - return f"```{ft} [Python]\n{blocks}\n```\n" + return f"```{ft} {code_group}\n{blocks}\n```\n" -support_commands = [("@snippet", run_snippet_cmd)] +def run_snippet_cmd_group(snippets_root, snippet_file, group, *labels): + return run_snippet_cmd( + snippets_root, snippet_file, *labels, code_group=group + ) + + +support_commands = [ + ("@snippet", run_snippet_cmd), + ("@snippet_group", run_snippet_cmd_group), +] def parse_commands(lines): diff --git a/kafka-examples/python/snippets/confluent_kafka_python.py b/kafka-examples/python/snippets/confluent_kafka_python.py new file mode 100644 index 0000000..e69de29 diff --git a/kafka-examples/python/snippets/kafka_python.py b/kafka-examples/python/snippets/kafka_python.py new file mode 100644 index 0000000..b296864 --- /dev/null +++ b/kafka-examples/python/snippets/kafka_python.py @@ -0,0 +1,67 @@ +# [common] + +import os +from kafka.admin import NewTopic +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer + +# NOTE: Replace with your own host and port +host = os.getenv("GUIDE_HOST", "127.0.0.1") +port = os.getenv("GUIDE_PORT", 9092) +topic_name = "my_topic" + +# [common] + + +# [create-topic] +def create_topic(): + admin = KafkaAdminClient(bootstrap_servers=f"{host}:{port}") + topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1) + admin.create_topics([topic]) + + +# [create-topic] + + +# [produce] +def produce(): + producer = KafkaProducer( + bootstrap_servers=f"{host}:{port}", + acks="all", + linger_ms=100, + ) + futures = [ + producer.send(topic_name, b"hello, hstream " + str(i).encode()) + for i in range(5) + ] + for future in futures: + response = future.get(timeout=10) + print("Producer response:", response) + + +# [produce] + + +# [consume] +def consume(): + consumer = KafkaConsumer( + topic_name, + bootstrap_servers=f"{host}:{port}", + auto_offset_reset="earliest", + enable_auto_commit=False, + fetch_max_wait_ms=1000, + ) + i = 0 + for msg in consumer: + print("Consumer response", msg) + i += 1 + if i >= 5: + consumer.close() + + +# [consume] + + +if __name__ == "__main__": + create_topic() + produce() + consume()