Skip to content

Commit

Permalink
add python examples for hstream kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Jan 10, 2024
1 parent 0ee5d75 commit a50827f
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/develop-with-kafka-api/_index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
order: ['compatibility.md', 'java.md']
order: ['compatibility.md', 'java.md', 'python.md']
collapsed: false
---

Expand Down
24 changes: 18 additions & 6 deletions docs/develop-with-kafka-api/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@

## Overview

HStream also supports Kafka API since 0.19.0, so users can connect to HStream using Kafka clients. HStream implements [Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not need to change any code in your current Kafka applications, just updating the Kafka URLs in your configurations to point to a HStream cluster, and that is it, then you can start streaming from your Kafka applications to a HStream cluster.
HStream also supports Kafka API since 0.19.0, so users can connect to HStream
using Kafka clients. HStream implements
[Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not
need to change any code in your current Kafka applications, just updating the
Kafka URLs in your configurations to point to a HStream cluster, and that is it,
then you can start streaming from your Kafka applications to a HStream cluster.

::: tip

Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to learn how to enable HStream'support of Kafka API.
Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to
learn how to enable HStream'support of Kafka API.

:::

## Compatibility with Apache Kafka

HStream supports Apache Kafka version 0.11 and later, and most Kafka clients should be able to auto-negotiate protocol versions.
HStream supports Apache Kafka version 0.11 and later, and most Kafka clients
should be able to auto-negotiate protocol versions.

Currenty, the clients below are tested by HStream.

| Language | Kafka Client |
| -------- | ----------------------------------------------------------- |
| Java | [Apache Kafka Java Client](https://github.com/apache/kafka) |
| Python | [kafka-python](https://github.com/dpkp/kafka-python) |
| Python | [kafka-python], [confluent-kafka-python] |
| Go | [franz-go](https://github.com/twmb/franz-go) |
| C/C++ | [librdkafka](https://github.com/confluentinc/librdkafka) |

Expand All @@ -31,13 +38,18 @@ Recommand using the latest version of each Kafka client

## Features not supported in Apache Kafka

HStream do not support below Kafka features now(we plan to support them in the later version):
HStream do not support below Kafka features now(we plan to support them in the
later version):

- Kafka transactions
- Quotas in Kafka

::: tip

The configuration of Kafka brokers is not applicable to HStream, as HStream is a completely different implementation.
The configuration of Kafka brokers is not applicable to HStream, as HStream is a
completely different implementation.

:::

[kafka-python]: https://github.com/dpkp/kafka-python
[confluent-kafka-python]: https://github.com/confluentinc/confluent-kafka-python
60 changes: 60 additions & 0 deletions docs/develop-with-kafka-api/python.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Develop with Python Kafka client

This guide will show you how to use Python Kafka client to interact with
HStream. Currenty, we support [kafka-python] and [confluent-kafka].

## Installation

```sh
# If you want to use kafka-python
pip install kafka-python

# Or if you want to use confluent-kafka
pip install confluent-kafka
```

::: tip

Prefer to use a virtual environment? Check out Python's built-in
[venv](https://docs.python.org/3/library/venv.html).

:::

## Create a Topic

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common create-topic

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common create-topic

:::

## Produce Records

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common produce

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common produce

:::

## Consume Records

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common consume

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common consume

:::

[kafka-python]: https://github.com/dpkp/kafka-python
[confluent-kafka]: https://github.com/confluentinc/confluent-kafka-python
2 changes: 1 addition & 1 deletion docs/develop/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ order: ['write', 'receive', 'process', 'ingest-and-distribute']
collapsed: false
---

Develope
Develop
1 change: 1 addition & 0 deletions embed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
if [[ "$EMBED_ME_ARGS" == "" ]]; then
./include_snippets.py --snippets-root . --file-pattern "docs/**/write/*.md"
./include_snippets.py --snippets-root . --file-pattern "docs/**/receive/*.md"
./include_snippets.py --snippets-root . --file-pattern "docs/**/python.md"
fi
20 changes: 15 additions & 5 deletions include_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import os

FILE_TYPES = {".py": "python", "cpp": "cpp"}
FILE_TYPES = {".py": ("python", "[Python]"), ".cpp": ("cpp", "[C++]")}


def run_snippet_cmd(snippets_root, snippet_file, *labels):
def run_snippet_cmd(snippets_root, snippet_file, *labels, code_group=None):
lines = [[] for _ in labels]
with open(os.path.join(snippets_root, snippet_file), "r") as f:
idx = None
Expand All @@ -24,12 +24,22 @@ def run_snippet_cmd(snippets_root, snippet_file, *labels):
lines[idx].append(line)
# TODO: strip indent
extension = os.path.splitext(snippet_file)[1]
ft = FILE_TYPES[extension]
(ft, _code_group) = FILE_TYPES[extension]
code_group = _code_group if code_group is None else code_group
blocks = "\n\n\n".join("".join(xs).strip() for xs in lines)
return f"```{ft} [Python]\n{blocks}\n```\n"
return f"```{ft} {code_group}\n{blocks}\n```\n"


support_commands = [("@snippet", run_snippet_cmd)]
def run_snippet_cmd_group(snippets_root, snippet_file, group, *labels):
return run_snippet_cmd(
snippets_root, snippet_file, *labels, code_group=group
)


support_commands = [
("@snippet", run_snippet_cmd),
("@snippet_group", run_snippet_cmd_group),
]


def parse_commands(lines):
Expand Down
106 changes: 106 additions & 0 deletions kafka-examples/python/snippets/confluent_kafka_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# [common]

import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
"bootstrap.servers": host + ":" + str(port),
"client.id": "confluent_kafka_client",
}
# [common]


# [create-topic]
def create_topic():
admin = AdminClient(conf)
new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
admin.create_topics([new_topic])


# [create-topic]


# [produce]
def produce():
def acked(err, msg):
if err is not None:
print(f"Failed to deliver message: {msg}: {err}")
else:
print(
f"Message produced: offset={msg.offset()}, "
f'key="{msg.key().decode()}", '
f'value="{msg.value().decode()}"'
)

producer = Producer(conf)
for i in range(5):
producer.produce(
topic_name,
key=b"key " + str(i).encode(),
value=b"hello, hstream " + str(i).encode(),
on_delivery=acked,
)
producer.flush()


# [produce]


# [consume]
def consume():
consumer = Consumer(
{
**conf,
"group.id": group_id,
"auto.offset.reset": "smallest",
"enable.auto.commit": "false",
"isolation.level": "read_uncommitted",
}
)
consumer.subscribe([topic_name])
i = 0
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print(f"ERROR: {msg.error()}")
else:
# Extract the (optional) key and value, and print.
print(
f"Consumed topic {msg.topic()}: "
f'key="{msg.key().decode()}", '
f'value="{msg.value().decode()}"'
)
i += 1
if i >= 5:
break
except KeyboardInterrupt:
pass
finally:
consumer.close()


# [consume]


if __name__ == "__main__":
try:
create_topic()
produce()
consume()
finally:
admin = AdminClient(conf)
admin.delete_topics([topic_name])
72 changes: 72 additions & 0 deletions kafka-examples/python/snippets/kafka_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# [common]

import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
addr = host + ":" + str(port)
topic_name = "my_topic"

# [common]


# [create-topic]
def create_topic():
admin = KafkaAdminClient(bootstrap_servers=addr)
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
admin.create_topics([topic])


# [create-topic]


# [produce]
def produce():
producer = KafkaProducer(
bootstrap_servers=addr,
acks="all",
linger_ms=100,
)
futures = [
producer.send(topic_name, b"hello, hstream " + str(i).encode())
for i in range(5)
]
for future in futures:
response = future.get(timeout=10)
print("Producer response:", response)


# [produce]


# [consume]
def consume():
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=addr,
auto_offset_reset="earliest",
enable_auto_commit=False,
fetch_max_wait_ms=1000,
)
i = 0
for msg in consumer:
print("Consumer response", msg)
i += 1
if i >= 5:
consumer.close()


# [consume]


if __name__ == "__main__":
try:
create_topic()
produce()
consume()
finally:
admin = KafkaAdminClient(bootstrap_servers=addr)
admin.delete_topics([topic_name])

0 comments on commit a50827f

Please sign in to comment.