Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add python examples for hstream kafka #49

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,27 @@ jobs:
./tests/wait_hserver.sh
sleep 5
- name: Run examples
- name: Run hstream examples
run: |
python3 -m pip install hstreamdb
python3 examples/py/snippets/guides.py
test-python-kafka-examples:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Start cluster
run: |
docker compose -f ./assets/quick-start-kafka-lite.yaml up -d
sleep 5
./tests/wait_hstream_kafka.sh
sleep 5
- name: Run hstream kafka examples
run: |
python3 -m pip install kafka-python
python3 kafka-examples/python/snippets/kafka_python.py
python3 -m pip install confluent-kafka
python3 kafka-examples/python/snippets/confluent_kafka_python.py
74 changes: 74 additions & 0 deletions assets/quick-start-kafka-lite.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
version: "3.5"

services:
hserver:
image: hstreamdb/hstream:latest
depends_on:
- zookeeper
- hstore
ports:
- "127.0.0.1:9092:9092"
expose:
- 9092
networks:
- hstream-quickstart
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /tmp:/tmp
- data_store:/data/store
command:
- bash
- "-c"
- |
set -e
/usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600 \
/usr/local/bin/hstream-server kafka \
--bind-address 0.0.0.0 --port 9092 \
--gossip-port 6571 \
--server-id 100 \
--seed-nodes "$$(hostname -I | awk '{print $$1}'):6571" \
--advertised-address $$(hostname -I | awk '{print $$1}') \
--metastore-uri zk://zookeeper:2181 \
--store-config /data/store/logdevice.conf \
--store-log-level warning
hstore:
image: hstreamdb/hstream:latest
networks:
- hstream-quickstart
volumes:
- data_store:/data/store
command:
- bash
- "-c"
- |
set -ex
# N.B. "enable-dscp-reflection=false" is required for linux kernel which
# doesn't support dscp reflection, e.g. centos7.
/usr/local/bin/ld-dev-cluster --root /data/store \
--use-tcp --tcp-host $$(hostname -I | awk '{print $$1}') \
--user-admin-port 6440 \
--param enable-dscp-reflection=false \
--no-interactive
zookeeper:
image: zookeeper:3.7
expose:
- 2181
networks:
- hstream-quickstart
volumes:
- data_zk_data:/data
- data_zk_datalog:/datalog

networks:
hstream-quickstart:
name: hstream-quickstart

volumes:
data_store:
name: quickstart_data_store
data_zk_data:
name: quickstart_data_zk_data
data_zk_datalog:
name: quickstart_data_zk_datalog
2 changes: 1 addition & 1 deletion docs/develop-with-kafka-api/_index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
order: ['compatibility.md', 'java.md']
order: ['compatibility.md', 'java.md', 'python.md']
collapsed: false
---

Expand Down
24 changes: 18 additions & 6 deletions docs/develop-with-kafka-api/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@

## Overview

HStream also supports Kafka API since 0.19.0, so users can connect to HStream using Kafka clients. HStream implements [Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not need to change any code in your current Kafka applications, just updating the Kafka URLs in your configurations to point to a HStream cluster, and that is it, then you can start streaming from your Kafka applications to a HStream cluster.
HStream also supports Kafka API since 0.19.0, so users can connect to HStream
using Kafka clients. HStream implements
[Kafka protocol](https://kafka.apache.org/protocol.html) underly, so you do not
need to change any code in your current Kafka applications, just updating the
Kafka URLs in your configurations to point to a HStream cluster, and that is it,
then you can start streaming from your Kafka applications to a HStream cluster.

::: tip

Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to learn how to enable HStream'support of Kafka API.
Refer to [get started with Kafka API](../start/get-started-with-kafka-api.md) to
learn how to enable HStream'support of Kafka API.

:::

## Compatibility with Apache Kafka

HStream supports Apache Kafka version 0.11 and later, and most Kafka clients should be able to auto-negotiate protocol versions.
HStream supports Apache Kafka version 0.11 and later, and most Kafka clients
should be able to auto-negotiate protocol versions.

Currenty, the clients below are tested by HStream.

| Language | Kafka Client |
| -------- | ----------------------------------------------------------- |
| Java | [Apache Kafka Java Client](https://github.com/apache/kafka) |
| Python | [kafka-python](https://github.com/dpkp/kafka-python) |
| Python | [kafka-python], [confluent-kafka-python] |
| Go | [franz-go](https://github.com/twmb/franz-go) |
| C/C++ | [librdkafka](https://github.com/confluentinc/librdkafka) |

Expand All @@ -31,13 +38,18 @@ Recommand using the latest version of each Kafka client

## Features not supported in Apache Kafka

HStream do not support below Kafka features now(we plan to support them in the later version):
HStream do not support below Kafka features now(we plan to support them in the
later version):

- Kafka transactions
- Quotas in Kafka

::: tip

The configuration of Kafka brokers is not applicable to HStream, as HStream is a completely different implementation.
The configuration of Kafka brokers is not applicable to HStream, as HStream is a
completely different implementation.

:::

[kafka-python]: https://github.com/dpkp/kafka-python
[confluent-kafka-python]: https://github.com/confluentinc/confluent-kafka-python
60 changes: 60 additions & 0 deletions docs/develop-with-kafka-api/python.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Develop with Python Kafka client

This guide will show you how to use Python Kafka client to interact with
HStream. Currenty, we support [kafka-python] and [confluent-kafka].

## Installation

```sh
# If you want to use kafka-python
pip install kafka-python

# Or if you want to use confluent-kafka
pip install confluent-kafka
```

::: tip

Prefer to use a virtual environment? Check out Python's built-in
[venv](https://docs.python.org/3/library/venv.html).

:::

## Create a Topic

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common create-topic

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common create-topic

:::

## Produce Records

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common produce

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common produce

:::

## Consume Records

::: code-group

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/kafka_python.py [kafka-python] common consume

<!-- prettier-ignore -->
@snippet_group kafka-examples/python/snippets/confluent_kafka_python.py [confluent-kafka] common consume

:::

[kafka-python]: https://github.com/dpkp/kafka-python
[confluent-kafka]: https://github.com/confluentinc/confluent-kafka-python
2 changes: 1 addition & 1 deletion docs/develop/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ order: ['write', 'receive', 'process', 'ingest-and-distribute']
collapsed: false
---

Develope
Develop
1 change: 1 addition & 0 deletions embed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
if [[ "$EMBED_ME_ARGS" == "" ]]; then
./include_snippets.py --snippets-root . --file-pattern "docs/**/write/*.md"
./include_snippets.py --snippets-root . --file-pattern "docs/**/receive/*.md"
./include_snippets.py --snippets-root . --file-pattern "docs/**/python.md"
fi
20 changes: 15 additions & 5 deletions include_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import os

FILE_TYPES = {".py": "python", "cpp": "cpp"}
FILE_TYPES = {".py": ("python", "[Python]"), ".cpp": ("cpp", "[C++]")}


def run_snippet_cmd(snippets_root, snippet_file, *labels):
def run_snippet_cmd(snippets_root, snippet_file, *labels, code_group=None):
lines = [[] for _ in labels]
with open(os.path.join(snippets_root, snippet_file), "r") as f:
idx = None
Expand All @@ -24,12 +24,22 @@ def run_snippet_cmd(snippets_root, snippet_file, *labels):
lines[idx].append(line)
# TODO: strip indent
extension = os.path.splitext(snippet_file)[1]
ft = FILE_TYPES[extension]
(ft, _code_group) = FILE_TYPES[extension]
code_group = _code_group if code_group is None else code_group
blocks = "\n\n\n".join("".join(xs).strip() for xs in lines)
return f"```{ft} [Python]\n{blocks}\n```\n"
return f"```{ft} {code_group}\n{blocks}\n```\n"


support_commands = [("@snippet", run_snippet_cmd)]
def run_snippet_cmd_group(snippets_root, snippet_file, group, *labels):
return run_snippet_cmd(
snippets_root, snippet_file, *labels, code_group=group
)


support_commands = [
("@snippet", run_snippet_cmd),
("@snippet_group", run_snippet_cmd_group),
]


def parse_commands(lines):
Expand Down
106 changes: 106 additions & 0 deletions kafka-examples/python/snippets/confluent_kafka_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# [common]

import os
from confluent_kafka import Producer, Consumer
from confluent_kafka.admin import AdminClient, NewTopic

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 9092)

topic_name = "my_topic"
group_id = "confluent_kafka_group"

conf = {
"bootstrap.servers": host + ":" + str(port),
"client.id": "confluent_kafka_client",
}
# [common]


# [create-topic]
def create_topic():
admin = AdminClient(conf)
new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
admin.create_topics([new_topic])


# [create-topic]


# [produce]
def produce():
def acked(err, msg):
if err is not None:
print(f"Failed to deliver message: {msg}: {err}")
else:
print(
f"Message produced: offset={msg.offset()}, "
f'key="{msg.key().decode()}", '
f'value="{msg.value().decode()}"'
)

producer = Producer(conf)
for i in range(5):
producer.produce(
topic_name,
key=b"key " + str(i).encode(),
value=b"hello, hstream " + str(i).encode(),
on_delivery=acked,
)
producer.flush()


# [produce]


# [consume]
def consume():
consumer = Consumer(
{
**conf,
"group.id": group_id,
"auto.offset.reset": "smallest",
"enable.auto.commit": "false",
"isolation.level": "read_uncommitted",
}
)
consumer.subscribe([topic_name])
i = 0
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print(f"ERROR: {msg.error()}")
else:
# Extract the (optional) key and value, and print.
print(
f"Consumed topic {msg.topic()}: "
f'key="{msg.key().decode()}", '
f'value="{msg.value().decode()}"'
)
i += 1
if i >= 5:
break
except KeyboardInterrupt:
pass
finally:
consumer.close()


# [consume]


if __name__ == "__main__":
try:
create_topic()
produce()
consume()
finally:
admin = AdminClient(conf)
admin.delete_topics([topic_name])
Loading
Loading