diff --git a/docs/develop-with-kafka-api/_index.md b/docs/develop-with-kafka-api/_index.md index 69b5f99..396af55 100644 --- a/docs/develop-with-kafka-api/_index.md +++ b/docs/develop-with-kafka-api/_index.md @@ -1,5 +1,5 @@ --- -order: ['compatibility.md', 'java.md'] +order: ['compatibility.md', 'java.md', 'python.md'] collapsed: false --- diff --git a/docs/develop-with-kafka-api/compatibility.md b/docs/develop-with-kafka-api/compatibility.md index 735b583..5ecc334 100644 --- a/docs/develop-with-kafka-api/compatibility.md +++ b/docs/develop-with-kafka-api/compatibility.md @@ -2,24 +2,31 @@ ## Overview -HStream also supports Kafka API since 0.19.0, so users can connect to HStream using Kafka clients. HStream implements [Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not need to change any code in your current Kafka applications, just updating the Kafka URLs in your configurations to point to a HStream cluster, and that is it, then you can start streaming from your Kafka applications to a HStream cluster. +HStream also supports Kafka API since 0.19.0, so users can connect to HStream +using Kafka clients. HStream implements +[Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not +need to change any code in your current Kafka applications, just updating the +Kafka URLs in your configurations to point to a HStream cluster, and that is it, +then you can start streaming from your Kafka applications to a HStream cluster. ::: tip -Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to learn how to enable HStream'support of Kafka API. +Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to +learn how to enable HStream'support of Kafka API. ::: ## Compatibility with Apache Kafka -HStream supports Apache Kafka version 0.11 and later, and most Kafka clients should be able to auto-negotiate protocol versions. +HStream supports Apache Kafka version 0.11 and later, and most Kafka clients +should be able to auto-negotiate protocol versions. Currenty, the clients below are tested by HStream. | Language | Kafka Client | | -------- | ----------------------------------------------------------- | | Java | [Apache Kafka Java Client](https://github.com/apache/kafka) | -| Python | [kafka-python](https://github.com/dpkp/kafka-python) | +| Python | [kafka-python], [confluent-kafka-python] | | Go | [franz-go](https://github.com/twmb/franz-go) | | C/C++ | [librdkafka](https://github.com/confluentinc/librdkafka) | @@ -31,13 +38,18 @@ Recommand using the latest version of each Kafka client ## Features not supported in Apache Kafka -HStream do not support below Kafka features now(we plan to support them in the later version): +HStream do not support below Kafka features now(we plan to support them in the +later version): - Kafka transactions - Quotas in Kafka ::: tip -The configuration of Kafka brokers is not applicable to HStream, as HStream is a completely different implementation. +The configuration of Kafka brokers is not applicable to HStream, as HStream is a +completely different implementation. ::: + +[kafka-python]: https://github.com/dpkp/kafka-python +[confluent-kafka-python]: https://github.com/confluentinc/confluent-kafka-python diff --git a/docs/develop-with-kafka-api/python.md b/docs/develop-with-kafka-api/python.md new file mode 100644 index 0000000..439f035 --- /dev/null +++ b/docs/develop-with-kafka-api/python.md @@ -0,0 +1,60 @@ +# Develop with Python Kafka client + +This guide will show you how to use Python Kafka client to interact with +HStream. Currenty, we support [kafka-python] and [confluent-kafka]. + +## Installation + +```sh +# If you want to use kafka-python +pip install kafka-python + +# Or if you want to use confluent-kafka +pip install confluent-kafka +``` + +::: tip + +Prefer to use a virtual environment? Check out Python's built-in +[venv](https://docs.python.org/3/library/venv.html). + +::: + +## Create a Topic + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common create-topic + + +@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common create-topic + +::: + +## Produce Records + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common produce + + +@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common produce + +::: + +## Consume Records + +::: code-group + + +@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common consume + + +@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common consume + +::: + +[kafka-python]: https://github.com/dpkp/kafka-python +[confluent-kafka]: https://github.com/confluentinc/confluent-kafka-python diff --git a/docs/develop/_index.md b/docs/develop/_index.md index 28531c5..3a1bccf 100644 --- a/docs/develop/_index.md +++ b/docs/develop/_index.md @@ -3,4 +3,4 @@ order: ['write', 'receive', 'process', 'ingest-and-distribute'] collapsed: false --- -Develope +Develop diff --git a/embed.sh b/embed.sh index 6c41452..1c49ad0 100755 --- a/embed.sh +++ b/embed.sh @@ -4,4 +4,5 @@ if [[ "$EMBED_ME_ARGS" == "" ]]; then ./include_snippets.py --snippets-root . --file-pattern "docs/**/write/*.md" ./include_snippets.py --snippets-root . --file-pattern "docs/**/receive/*.md" + ./include_snippets.py --snippets-root . --file-pattern "docs/**/python.md" fi diff --git a/include_snippets.py b/include_snippets.py index 81a0377..77c9253 100755 --- a/include_snippets.py +++ b/include_snippets.py @@ -2,10 +2,10 @@ import os -FILE_TYPES = {".py": "python", "cpp": "cpp"} +FILE_TYPES = {".py": ("python", "[Python]"), ".cpp": ("cpp", "[C++]")} -def run_snippet_cmd(snippets_root, snippet_file, *labels): +def run_snippet_cmd(snippets_root, snippet_file, *labels, code_group=None): lines = [[] for _ in labels] with open(os.path.join(snippets_root, snippet_file), "r") as f: idx = None @@ -24,12 +24,22 @@ def run_snippet_cmd(snippets_root, snippet_file, *labels): lines[idx].append(line) # TODO: strip indent extension = os.path.splitext(snippet_file)[1] - ft = FILE_TYPES[extension] + (ft, _code_group) = FILE_TYPES[extension] + code_group = _code_group if code_group is None else code_group blocks = "\n\n\n".join("".join(xs).strip() for xs in lines) - return f"```{ft} [Python]\n{blocks}\n```\n" + return f"```{ft} {code_group}\n{blocks}\n```\n" -support_commands = [("@snippet", run_snippet_cmd)] +def run_snippet_cmd_group(snippets_root, snippet_file, group, *labels): + return run_snippet_cmd( + snippets_root, snippet_file, *labels, code_group=group + ) + + +support_commands = [ + ("@snippet", run_snippet_cmd), + ("@snippet_group", run_snippet_cmd_group), +] def parse_commands(lines): diff --git a/kafka-examples/python/snippets/confluent_kafka_python.py b/kafka-examples/python/snippets/confluent_kafka_python.py new file mode 100644 index 0000000..377d038 --- /dev/null +++ b/kafka-examples/python/snippets/confluent_kafka_python.py @@ -0,0 +1,106 @@ +# [common] + +import os +from confluent_kafka import Producer, Consumer +from confluent_kafka.admin import AdminClient, NewTopic + +# NOTE: Replace with your own host and port +host = os.getenv("GUIDE_HOST", "127.0.0.1") +port = os.getenv("GUIDE_PORT", 9092) + +topic_name = "my_topic" +group_id = "confluent_kafka_group" + +conf = { + "bootstrap.servers": host + ":" + str(port), + "client.id": "confluent_kafka_client", +} +# [common] + + +# [create-topic] +def create_topic(): + admin = AdminClient(conf) + new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1) + admin.create_topics([new_topic]) + + +# [create-topic] + + +# [produce] +def produce(): + def acked(err, msg): + if err is not None: + print(f"Failed to deliver message: {msg}: {err}") + else: + print( + f"Message produced: offset={msg.offset()}, " + f'key="{msg.key().decode()}", ' + f'value="{msg.value().decode()}"' + ) + + producer = Producer(conf) + for i in range(5): + producer.produce( + topic_name, + key=b"key " + str(i).encode(), + value=b"hello, hstream " + str(i).encode(), + on_delivery=acked, + ) + producer.flush() + + +# [produce] + + +# [consume] +def consume(): + consumer = Consumer( + { + **conf, + "group.id": group_id, + "auto.offset.reset": "smallest", + "enable.auto.commit": "false", + "isolation.level": "read_uncommitted", + } + ) + consumer.subscribe([topic_name]) + i = 0 + try: + while True: + msg = consumer.poll(1.0) + if msg is None: + # Initial message consumption may take up to + # `session.timeout.ms` for the consumer group to + # rebalance and start consuming + print("Waiting...") + elif msg.error(): + print(f"ERROR: {msg.error()}") + else: + # Extract the (optional) key and value, and print. + print( + f"Consumed topic {msg.topic()}: " + f'key="{msg.key().decode()}", ' + f'value="{msg.value().decode()}"' + ) + i += 1 + if i >= 5: + break + except KeyboardInterrupt: + pass + finally: + consumer.close() + + +# [consume] + + +if __name__ == "__main__": + try: + create_topic() + produce() + consume() + finally: + admin = AdminClient(conf) + admin.delete_topics([topic_name]) diff --git a/kafka-examples/python/snippets/kafka_python.py b/kafka-examples/python/snippets/kafka_python.py new file mode 100644 index 0000000..26ced10 --- /dev/null +++ b/kafka-examples/python/snippets/kafka_python.py @@ -0,0 +1,72 @@ +# [common] + +import os +from kafka.admin import NewTopic +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer + +# NOTE: Replace with your own host and port +host = os.getenv("GUIDE_HOST", "127.0.0.1") +port = os.getenv("GUIDE_PORT", 9092) +addr = host + ":" + str(port) +topic_name = "my_topic" + +# [common] + + +# [create-topic] +def create_topic(): + admin = KafkaAdminClient(bootstrap_servers=addr) + topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1) + admin.create_topics([topic]) + + +# [create-topic] + + +# [produce] +def produce(): + producer = KafkaProducer( + bootstrap_servers=addr, + acks="all", + linger_ms=100, + ) + futures = [ + producer.send(topic_name, b"hello, hstream " + str(i).encode()) + for i in range(5) + ] + for future in futures: + response = future.get(timeout=10) + print("Producer response:", response) + + +# [produce] + + +# [consume] +def consume(): + consumer = KafkaConsumer( + topic_name, + bootstrap_servers=addr, + auto_offset_reset="earliest", + enable_auto_commit=False, + fetch_max_wait_ms=1000, + ) + i = 0 + for msg in consumer: + print("Consumer response", msg) + i += 1 + if i >= 5: + consumer.close() + + +# [consume] + + +if __name__ == "__main__": + try: + create_topic() + produce() + consume() + finally: + admin = KafkaAdminClient(bootstrap_servers=addr) + admin.delete_topics([topic_name])