Skip to content

Commit

Permalink
add python examples for hstream kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Jan 9, 2024
1 parent 51b8b94 commit 913018e
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/develop-with-kafka-api/_index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
order: ['compatibility.md', 'java.md']
order: ['compatibility.md', 'java.md', 'python.md']
collapsed: false
---

Expand Down
24 changes: 18 additions & 6 deletions docs/develop-with-kafka-api/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@

## Overview

HStream also supports Kafka API since 0.19.0, so users can connect to HStream using Kafka clients. HStream implements [Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not need to change any code in your current Kafka applications, just updating the Kafka URLs in your configurations to point to a HStream cluster, and that is it, then you can start streaming from your Kafka applications to a HStream cluster.
HStream also supports Kafka API since 0.19.0, so users can connect to HStream
using Kafka clients. HStream implements
[Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not
need to change any code in your current Kafka applications, just updating the
Kafka URLs in your configurations to point to a HStream cluster, and that is it,
then you can start streaming from your Kafka applications to a HStream cluster.

::: tip

Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to learn how to enable HStream'support of Kafka API.
Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to
learn how to enable HStream'support of Kafka API.

:::

## Compatibility with Apache Kafka

HStream supports Apache Kafka version 0.11 and later, and most Kafka clients should be able to auto-negotiate protocol versions.
HStream supports Apache Kafka version 0.11 and later, and most Kafka clients
should be able to auto-negotiate protocol versions.

Currenty, the clients below are tested by HStream.

| Language | Kafka Client |
| -------- | ----------------------------------------------------------- |
| Java | [Apache Kafka Java Client](https://github.com/apache/kafka) |
| Python | [kafka-python](https://github.com/dpkp/kafka-python) |
| Python | [kafka-python], [confluent-kafka-python] |
| Go | [franz-go](https://github.com/twmb/franz-go) |
| C/C++ | [librdkafka](https://github.com/confluentinc/librdkafka) |

Expand All @@ -31,13 +38,18 @@ Recommand using the latest version of each Kafka client

## Features not supported in Apache Kafka

HStream do not support below Kafka features now(we plan to support them in the later version):
HStream do not support below Kafka features now(we plan to support them in the
later version):

- Kafka transactions
- Quotas in Kafka

::: tip

The configuration of Kafka brokers is not applicable to HStream, as HStream is a completely different implementation.
The configuration of Kafka brokers is not applicable to HStream, as HStream is a
completely different implementation.

:::

[kafka-python]: https://github.com/dpkp/kafka-python
[confluent-kafka-python]: https://github.com/confluentinc/confluent-kafka-python
57 changes: 57 additions & 0 deletions docs/develop-with-kafka-api/python.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Develop with Python Kafka client

This guide will show you how to use Python Kafka client to interact with
HStream. Currenty, we support [kafka-python] and [confluent-kafka].

::: info

The following examples assume that you have Python3.7+ installed.

:::

## Installation

```sh
# If you want to use kafka-python
pip install kafka-python

# Or if you want to use confluent-kafka
pip install confluent-kafka
```

::: tips

Prefer to use a virtual environment? Check out Python's built-in
[venv](https://docs.python.org/3/library/venv.html).

:::

## Create a Topic

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common create-topic

:::

## Kafka Producer

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common produce

:::

## Kafka Consumer

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common consume

:::

[kafka-python]: https://github.com/dpkp/kafka-python
[confluent-kafka]: https://github.com/confluentinc/confluent-kafka-python
2 changes: 1 addition & 1 deletion docs/develop/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ order: ['write', 'receive', 'process', 'ingest-and-distribute']
collapsed: false
---

Develope
Develop
27 changes: 24 additions & 3 deletions examples/py/snippets/guides.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class AppendCallback(hstreamdb.BufferedProducer.AppendCallback):

def on_success(self, stream_name, payloads, stream_keyid):
self.count += 1
print(f"Batch {self.count}: Append success with {len(payloads)} payloads.")
print(
f"Batch {self.count}: Append success with {len(payloads)} payloads."
)

def on_fail(self, stream_name, payloads, stream_keyid, e):
print("Append failed!")
Expand Down Expand Up @@ -179,6 +181,25 @@ async def read_reader(client):


if __name__ == "__main__":
import functools

# TODO: the client should be able to retry automatically if server is
# UNAVAILABLE (e.g. NOSEQUENCER)
def retry(async_function, max_retries=10, delay=1):
@functools.wraps(async_function)
async def wrapper(client):
for attempt in range(1, max_retries + 1):
try:
result = await async_function(client)
return result # If successful, return the result
except Exception as e:
print(f"Attempt {attempt} failed: {e}")
await asyncio.sleep(delay) # Wait before the next attempt

raise ValueError(f"Function failed after {max_retries} attempts")

return wrapper

def safe_run(fun, *args):
try:
fun(*args)
Expand All @@ -190,8 +211,8 @@ def safe_run(fun, *args):
main(
create_stream,
list_streams,
append_records,
buffered_append_records,
retry(append_records),
retry(buffered_append_records),
create_subscription,
list_subscriptions,
subscribe_records,
Expand Down
20 changes: 15 additions & 5 deletions include_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import os

FILE_TYPES = {".py": "python", "cpp": "cpp"}
FILE_TYPES = {".py": ("python", "[Python]"), ".cpp": ("cpp", "[C++]")}


def run_snippet_cmd(snippets_root, snippet_file, *labels):
def run_snippet_cmd(snippets_root, snippet_file, *labels, code_group=None):
lines = [[] for _ in labels]
with open(os.path.join(snippets_root, snippet_file), "r") as f:
idx = None
Expand All @@ -24,12 +24,22 @@ def run_snippet_cmd(snippets_root, snippet_file, *labels):
lines[idx].append(line)
# TODO: strip indent
extension = os.path.splitext(snippet_file)[1]
ft = FILE_TYPES[extension]
(ft, _code_group) = FILE_TYPES[extension]
code_group = _code_group if code_group is None else code_group
blocks = "\n\n\n".join("".join(xs).strip() for xs in lines)
return f"```{ft} [Python]\n{blocks}\n```\n"
return f"```{ft} {code_group}\n{blocks}\n```\n"


support_commands = [("@snippet", run_snippet_cmd)]
def run_snippet_cmd_group(snippets_root, snippet_file, group, *labels):
return run_snippet_cmd(
snippets_root, snippet_file, *labels, code_group=group
)


support_commands = [
("@snippet", run_snippet_cmd),
("@snippet_group", run_snippet_cmd_group),
]


def parse_commands(lines):
Expand Down
Empty file.
67 changes: 67 additions & 0 deletions kafka-examples/python/snippets/kafka_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# [common]

import os
from kafka.admin import NewTopic
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)
topic_name = "my_topic"

# [common]


# [create-topic]
def create_topic():
admin = KafkaAdminClient(bootstrap_servers=f"{host}:{port}")
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
admin.create_topics([topic])


# [create-topic]


# [produce]
def produce():
producer = KafkaProducer(
bootstrap_servers=f"{host}:{port}",
acks="all",
linger_ms=100,
)
futures = [
producer.send(topic_name, b"hello, hstream " + str(i).encode())
for i in range(5)
]
for future in futures:
response = future.get(timeout=10)
print("Producer response:", response)


# [produce]


# [consume]
def consume():
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=f"{host}:{port}",
auto_offset_reset="earliest",
enable_auto_commit=False,
fetch_max_wait_ms=1000,
)
i = 0
for msg in consumer:
print("Consumer response", msg)
i += 1
if i >= 5:
consumer.close()


# [consume]


if __name__ == "__main__":
create_topic()
produce()
consume()

0 comments on commit 913018e

Please sign in to comment.