diff --git a/docs/api-reference/quixstreams.md b/docs/api-reference/quixstreams.md index 8710776f0..14f6a3f34 100644 --- a/docs/api-reference/quixstreams.md +++ b/docs/api-reference/quixstreams.md @@ -10533,6 +10533,7 @@ processing all nested files within it. Expects folder and file structures as generated by the related FileSink connector: +``` my_topics/ ├── topic_a/ │ ├── 0/ @@ -10542,7 +10543,8 @@ my_topics/ │ ├── 0003.ext │ └── 0016.ext └── topic_b/ -└── etc... + └── etc... +``` Intended to be used with a single topic (ex: topic_a), but will recursively read from whatever entrypoint is passed to it. @@ -10551,22 +10553,23 @@ File format structure depends on the file format. See the `.formats` and `.compressions` modules to see what is supported. -**Example**: +Example Usage: - - from quixstreams import Application - from quixstreams.sources.community.file import FileSource - - app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") - source = FileSource( - filepath="/path/to/my/topic_folder", - file_format="json", - file_compression="gzip", - ) - sdf = app.dataframe(source=source).print(metadata=True) - - if __name__ == "__main__": - app.run() +```python +from quixstreams import Application +from quixstreams.sources.community.file import FileSource + +app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") +source = FileSource( + filepath="/path/to/my/topic_folder", + file_format="json", + file_compression="gzip", +) +sdf = app.dataframe(source=source).print(metadata=True) + +if __name__ == "__main__": + app.run() +``` @@ -10581,7 +10584,7 @@ def __init__(filepath: Union[str, Path], shutdown_timeout: float = 10) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L59) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L63) **Arguments**: @@ -10606,7 +10609,7 @@ to gracefully shutdown def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L106) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L110) Uses the file structure to generate the desired partition count for the @@ -10634,6 +10637,182 @@ the original default topic, with updated partition count This module contains Sources developed and maintained by the members of Quix Streams community. + + +## quixstreams.sources.community.pubsub + + + +## quixstreams.sources.community.pubsub.consumer + + + +### PubSubSubscriptionNotFound + +```python +class PubSubSubscriptionNotFound(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L30) + +Raised when an expected subscription does not exist + + + +### PubSubConsumer + +```python +class PubSubConsumer() +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L34) + + + +#### PubSubConsumer.poll\_and\_process + +```python +def poll_and_process(timeout: Optional[float] = None) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L105) + +This uses the asynchronous puller to retrieve and handle a message with its +assigned callback. + +Committing is a separate step. + + + +#### PubSubConsumer.poll\_and\_process\_batch + +```python +def poll_and_process_batch() +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L122) + +Polls and processes until either the max_batch_size or batch_timeout is reached. + + + +#### PubSubConsumer.subscribe + +```python +def subscribe() +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L134) + +Asynchronous subscribers require subscribing (synchronous do not). + +NOTE: This will not detect whether the subscription exists. + + + +#### PubSubConsumer.handle\_subscription + +```python +def handle_subscription() -> Subscription +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/consumer.py#L144) + +Handles subscription management in one place. + +Subscriptions work similarly to Kafka consumer groups. + +- Each topic can have multiple subscriptions (consumer group ~= subscription). + +- A subscription can have multiple subscribers (similar to consumers in a group). + +- NOTE: exactly-once adds message methods (ack_with_response) when enabled. + + + +## quixstreams.sources.community.pubsub.pubsub + + + +### PubSubSource + +```python +class PubSubSource(Source) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/pubsub.py#L16) + +This source enables reading from a Google Cloud Pub/Sub topic, +dumping it to a kafka topic using desired SDF-based transformations. + +Provides "at-least-once" guarantees. + +Currently, forwarding message keys ("ordered messages" in Pub/Sub) is unsupported. + +The incoming message value will be in bytes, so transform in your SDF accordingly. + +Example Usage: + +```python +from quixstreams import Application +from quixstreams.sources.community.pubsub import PubSubSource +from os import environ + +source = PubSubSource( + # Suggested: pass JSON-formatted credentials from an environment variable. + service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"], + project_id="", + topic_id="", # NOTE: NOT the full /x/y/z path! + subscription_id="", # NOTE: NOT the full /x/y/z path! + create_subscription=True, +) +app = Application( + broker_address="localhost:9092", + auto_offset_reset="earliest", + consumer_group="gcp", + loglevel="INFO" +) +sdf = app.dataframe(source=source).print(metadata=True) + +if __name__ == "__main__": + app.run() +``` + + + +#### PubSubSource.\_\_init\_\_ + +```python +def __init__(project_id: str, + topic_id: str, + subscription_id: str, + service_account_json: Optional[str] = None, + commit_every: int = 100, + commit_interval: float = 5.0, + create_subscription: bool = False, + enable_message_ordering: bool = False, + shutdown_timeout: float = 10.0) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/pubsub.py#L55) + +**Arguments**: + +- `project_id`: a Google Cloud project ID. +- `topic_id`: a Pub/Sub topic ID (NOT the full path). +- `subscription_id`: a Pub/Sub subscription ID (NOT the full path). +- `service_account_json`: a Google Cloud Credentials JSON as a string +Can instead use environment variables (which have different behavior): +- "GOOGLE_APPLICATION_CREDENTIALS" set to a JSON filepath i.e. /x/y/z.json +- "PUBSUB_EMULATOR_HOST" set to a URL if using an emulated Pub/Sub +- `commit_every`: max records allowed to be processed before committing. +- `commit_interval`: max allowed elapsed time between commits. +- `create_subscription`: whether to attempt to create a subscription at +startup; if it already exists, it instead logs its details (DEBUG level). +- `enable_message_ordering`: When creating a Pub/Sub subscription, whether +to allow message ordering. NOTE: does NOT affect existing subscriptions! +- `shutdown_timeout`: How long to wait for a graceful shutdown of the source. + ## quixstreams.sources.base diff --git a/docs/api-reference/sources.md b/docs/api-reference/sources.md index cccf88cdf..c2869cd8c 100644 --- a/docs/api-reference/sources.md +++ b/docs/api-reference/sources.md @@ -640,6 +640,7 @@ processing all nested files within it. Expects folder and file structures as generated by the related FileSink connector: +``` my_topics/ ├── topic_a/ │ ├── 0/ @@ -649,7 +650,8 @@ my_topics/ │ ├── 0003.ext │ └── 0016.ext └── topic_b/ -└── etc... + └── etc... +``` Intended to be used with a single topic (ex: topic_a), but will recursively read from whatever entrypoint is passed to it. @@ -658,22 +660,23 @@ File format structure depends on the file format. See the `.formats` and `.compressions` modules to see what is supported. -**Example**: +Example Usage: - - from quixstreams import Application - from quixstreams.sources.community.file import FileSource - - app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") - source = FileSource( - filepath="/path/to/my/topic_folder", - file_format="json", - file_compression="gzip", - ) - sdf = app.dataframe(source=source).print(metadata=True) - - if __name__ == "__main__": - app.run() +```python +from quixstreams import Application +from quixstreams.sources.community.file import FileSource + +app = Application(broker_address="localhost:9092", auto_offset_reset="earliest") +source = FileSource( + filepath="/path/to/my/topic_folder", + file_format="json", + file_compression="gzip", +) +sdf = app.dataframe(source=source).print(metadata=True) + +if __name__ == "__main__": + app.run() +``` @@ -690,7 +693,7 @@ def __init__(filepath: Union[str, Path], shutdown_timeout: float = 10) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L59) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L63)
@@ -719,7 +722,7 @@ to gracefully shutdown def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L106) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/file/file.py#L110) Uses the file structure to generate the desired partition count for the @@ -776,3 +779,92 @@ with {_key: str, _value: dict, _timestamp: int}. ## quixstreams.sources.community.file.formats.parquet + + +## quixstreams.sources.community.pubsub.pubsub + + + +### PubSubSource + +```python +class PubSubSource(Source) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/pubsub.py#L16) + +This source enables reading from a Google Cloud Pub/Sub topic, +dumping it to a kafka topic using desired SDF-based transformations. + +Provides "at-least-once" guarantees. + +Currently, forwarding message keys ("ordered messages" in Pub/Sub) is unsupported. + +The incoming message value will be in bytes, so transform in your SDF accordingly. + +Example Usage: + +```python +from quixstreams import Application +from quixstreams.sources.community.pubsub import PubSubSource +from os import environ + +source = PubSubSource( + # Suggested: pass JSON-formatted credentials from an environment variable. + service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"], + project_id="", + topic_id="", # NOTE: NOT the full /x/y/z path! + subscription_id="", # NOTE: NOT the full /x/y/z path! + create_subscription=True, +) +app = Application( + broker_address="localhost:9092", + auto_offset_reset="earliest", + consumer_group="gcp", + loglevel="INFO" +) +sdf = app.dataframe(source=source).print(metadata=True) + +if __name__ == "__main__": + app.run() +``` + + + +

+ +#### PubSubSource.\_\_init\_\_ + +```python +def __init__(project_id: str, + topic_id: str, + subscription_id: str, + service_account_json: Optional[str] = None, + commit_every: int = 100, + commit_interval: float = 5.0, + create_subscription: bool = False, + enable_message_ordering: bool = False, + shutdown_timeout: float = 10.0) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/community/pubsub/pubsub.py#L55) + + +
+***Arguments:*** + +- `project_id`: a Google Cloud project ID. +- `topic_id`: a Pub/Sub topic ID (NOT the full path). +- `subscription_id`: a Pub/Sub subscription ID (NOT the full path). +- `service_account_json`: a Google Cloud Credentials JSON as a string +Can instead use environment variables (which have different behavior): +- "GOOGLE_APPLICATION_CREDENTIALS" set to a JSON filepath i.e. /x/y/z.json +- "PUBSUB_EMULATOR_HOST" set to a URL if using an emulated Pub/Sub +- `commit_every`: max records allowed to be processed before committing. +- `commit_interval`: max allowed elapsed time between commits. +- `create_subscription`: whether to attempt to create a subscription at +startup; if it already exists, it instead logs its details (DEBUG level). +- `enable_message_ordering`: When creating a Pub/Sub subscription, whether +to allow message ordering. NOTE: does NOT affect existing subscriptions! +- `shutdown_timeout`: How long to wait for a graceful shutdown of the source. +