From 8b4db4570035f24ffccec50b7d0104e6c7a1828e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 28 Nov 2024 13:22:50 +0100 Subject: [PATCH] Update documentation (#656) Co-authored-by: daniil-quix <133032822+daniil-quix@users.noreply.github.com> --- docs/api-reference/application.md | 32 +- docs/api-reference/context.md | 6 +- docs/api-reference/kafka.md | 168 +++---- docs/api-reference/quixstreams.md | 708 ++++++++++++++++++++---------- docs/api-reference/sinks.md | 348 +++++++++++++-- docs/api-reference/sources.md | 101 ++--- docs/api-reference/topics.md | 6 +- 7 files changed, 922 insertions(+), 447 deletions(-) diff --git a/docs/api-reference/application.md b/docs/api-reference/application.md index bbb7e3878..e860b1865 100644 --- a/docs/api-reference/application.md +++ b/docs/api-reference/application.md @@ -10,7 +10,7 @@ class Application() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L63) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L75) The main Application class. @@ -85,7 +85,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None, processing_guarantee: ProcessingGuarantee = "at-least-once") ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L101) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L113)
@@ -174,7 +174,7 @@ instead of the default one. def Quix(cls, *args, **kwargs) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L339) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L352) RAISES EXCEPTION: DEPRECATED. @@ -197,7 +197,7 @@ def topic(name: str, timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L371) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L384) Create a topic definition. @@ -279,7 +279,7 @@ def dataframe(topic: Optional[Topic] = None, source: Optional[BaseSource] = None) -> StreamingDataFrame ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L451) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L464) A simple helper method that generates a `StreamingDataFrame`, which is used @@ -335,7 +335,7 @@ to be used as an input topic. def stop(fail: bool = False) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L506) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L520) Stop the internal poll loop and the message processing. @@ -362,7 +362,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint. def get_producer() -> Producer ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L551) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L565) Create and return a pre-configured Producer instance. The Producer is initialized with params passed to Application. @@ -397,7 +397,7 @@ with app.get_producer() as producer: def get_consumer(auto_commit_enable: bool = True) -> Consumer ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L599) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L613) Create and return a pre-configured Consumer instance. @@ -454,7 +454,7 @@ with app.get_consumer() as consumer: def clear_state() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L649) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L663) Clear the state of the application. @@ -468,7 +468,7 @@ Clear the state of the application. def add_source(source: BaseSource, topic: Optional[Topic] = None) -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L655) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L669) Add a source to the application. @@ -494,7 +494,7 @@ Default: the source default def run(dataframe: Optional[StreamingDataFrame] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L685) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L699) Start processing data from Kafka using provided `StreamingDataFrame` @@ -530,7 +530,7 @@ app.run() def setup_topics() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L808) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L822) Validate and create the topics @@ -542,7 +542,7 @@ Validate and create the topics class ApplicationConfig(BaseSettings) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L984) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L998) Immutable object holding the application configuration @@ -557,7 +557,7 @@ For details see :class:`quixstreams.Application` ```python @classmethod def settings_customise_sources( - cls, settings_cls: Type[BaseSettings], + cls, settings_cls: Type[PydanticBaseSettings], init_settings: PydanticBaseSettingsSource, env_settings: PydanticBaseSettingsSource, dotenv_settings: PydanticBaseSettingsSource, @@ -565,7 +565,7 @@ def settings_customise_sources( ) -> Tuple[PydanticBaseSettingsSource, ...] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1019) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1033) Included to ignore reading/setting values from the environment @@ -579,7 +579,7 @@ Included to ignore reading/setting values from the environment def copy(**kwargs) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1032) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1046) Update the application config and return a copy diff --git a/docs/api-reference/context.md b/docs/api-reference/context.md index 4c38a5241..d4a413e96 100644 --- a/docs/api-reference/context.md +++ b/docs/api-reference/context.md @@ -12,7 +12,7 @@ def set_message_context(context: Optional[MessageContext]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L20) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L22) Set a MessageContext for the current message in the given `contextvars.Context` @@ -52,10 +52,10 @@ sdf = sdf.update(lambda value: alter_context(value)) #### message\_context ```python -def message_context() -> MessageContext +def message_context() -> Optional[MessageContext] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L51) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L53) Get a MessageContext for the current message, which houses most of the message diff --git a/docs/api-reference/kafka.md b/docs/api-reference/kafka.md index e91c3cc55..f74c7ef96 100644 --- a/docs/api-reference/kafka.md +++ b/docs/api-reference/kafka.md @@ -23,7 +23,7 @@ def __init__(broker_address: Union[str, ConnectionConfig], logger: logging.Logger = logger, error_callback: Callable[[KafkaError], None] = _default_error_cb, extra_config: Optional[dict] = None, - flush_timeout: Optional[int] = None) + flush_timeout: Optional[float] = None) ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/producer.py#L43) @@ -156,21 +156,21 @@ A separate producer class used only internally for transactions ## quixstreams.kafka.consumer - + -### Consumer +### BaseConsumer ```python -class Consumer() +class BaseConsumer() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L67) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L68) - +

-#### Consumer.\_\_init\_\_ +#### BaseConsumer.\_\_init\_\_ ```python def __init__(broker_address: Union[str, ConnectionConfig], @@ -184,7 +184,7 @@ def __init__(broker_address: Union[str, ConnectionConfig], extra_config: Optional[dict] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L68) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L69) A wrapper around `confluent_kafka.Consumer`. @@ -217,17 +217,17 @@ Passed as "offset_commit_cb" to `confluent_kafka.Consumer`. will be passed to `confluent_kafka.Consumer` as is. Note: values passed as arguments override values in `extra_config`. - +

-#### Consumer.poll +#### BaseConsumer.poll ```python def poll(timeout: Optional[float] = None) -> Optional[Message] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L131) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L132) Consumes a single message, calls callbacks and returns events. @@ -255,55 +255,17 @@ event or callback. None or -1 is infinite. Default: None. `Optional[Message]`: A `Message` object or `None` on timeout - - -

- -#### Consumer.subscribe - -```python -def subscribe(topics: List[str], - on_assign: Optional[RebalancingCallback] = None, - on_revoke: Optional[RebalancingCallback] = None, - on_lost: Optional[RebalancingCallback] = None) -``` - -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L150) - -Set subscription to supplied list of topics - -This replaces a previous subscription. - - -
-***Arguments:*** - -- `topics` (`List[str]`): List of topics (strings) to subscribe to. -- `on_assign` (`Optional[RebalancingCallback]`): callback to provide handling of -customized offsets on completion of a successful partition re-assignment. -- `on_revoke` (`Optional[RebalancingCallback]`): callback to provide handling of -offset commits to a customized store on the start of a rebalance operation. -- `on_lost` (`Optional[RebalancingCallback]`): callback to provide handling in -the case the partition assignment has been lost. Partitions that have been -lost may already be owned by other members in the group and therefore -committing offsets, for example, may fail. - -**Raises**: - -- `KafkaException`: if a Kafka-based error occurs -- `RuntimeError`: if called on a closed consumer - - +

-#### Consumer.unsubscribe +#### BaseConsumer.unsubscribe ```python def unsubscribe() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L234) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L235) Remove current subscription. @@ -312,18 +274,18 @@ Remove current subscription. - `KafkaException`: if a Kafka-based error occurs - `RuntimeError`: if called on a closed consumer - +

-#### Consumer.store\_offsets +#### BaseConsumer.store\_offsets ```python def store_offsets(message: Optional[Message] = None, offsets: Optional[List[TopicPartition]] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L243) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L244) Store offsets for a message or a list of offsets. @@ -344,11 +306,11 @@ Note that 'enable.auto.offset.store' must be set to False when using this API. - `KafkaException`: if a Kafka-based error occurs - `RuntimeError`: if called on a closed consumer - +

-#### Consumer.commit +#### BaseConsumer.commit ```python def commit(message: Optional[Message] = None, @@ -356,7 +318,7 @@ def commit(message: Optional[Message] = None, asynchronous: bool = True) -> Optional[List[TopicPartition]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L274) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L275) Commit a message or a list of offsets. @@ -383,18 +345,18 @@ each partition should be checked for success. - `KafkaException`: if a Kafka-based error occurs - `RuntimeError`: if called on a closed consumer - +

-#### Consumer.committed +#### BaseConsumer.committed ```python def committed(partitions: List[TopicPartition], timeout: Optional[float] = None) -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L314) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L316) Retrieve committed offsets for the specified partitions. @@ -417,11 +379,11 @@ None or -1 is infinite. Default: None `List[TopicPartition]`: List of topic+partitions with offset and possibly error set. - +

-#### Consumer.get\_watermark\_offsets +#### BaseConsumer.get\_watermark\_offsets ```python def get_watermark_offsets(partition: TopicPartition, @@ -429,7 +391,7 @@ def get_watermark_offsets(partition: TopicPartition, cached: bool = False) -> Tuple[int, int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L332) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L334) Retrieve low and high offsets for the specified partition. @@ -457,18 +419,18 @@ message fetched from the broker for this partition. `Tuple[int, int]`: Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1. - +

-#### Consumer.list\_topics +#### BaseConsumer.list\_topics ```python def list_topics(topic: Optional[str] = None, timeout: Optional[float] = None) -> ClusterMetadata ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L358) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L360) Request metadata from the cluster. @@ -490,17 +452,17 @@ None or -1 is infinite. Default: None - `KafkaException`: if a Kafka-based error occurs - +

-#### Consumer.memberid +#### BaseConsumer.memberid ```python def memberid() -> Optional[str] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L379) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L381) Return this client's broker-assigned group member id. @@ -517,18 +479,18 @@ the consumer during rebalance. `Optional[string]`: Member id string or None - +

-#### Consumer.offsets\_for\_times +#### BaseConsumer.offsets\_for\_times ```python def offsets_for_times(partitions: List[TopicPartition], timeout: Optional[float] = None) -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L392) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L394) Look up offsets by timestamp for the specified partitions. @@ -557,17 +519,17 @@ None or -1 is infinite. Default: None `List[TopicPartition]`: List of topic+partition with offset field set and possibly error set - +

-#### Consumer.pause +#### BaseConsumer.pause ```python def pause(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L418) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L420) Pause consumption for the provided list of partitions. @@ -585,17 +547,17 @@ Does NOT affect the result of `Consumer.assignment()`. - `KafkaException`: if a Kafka-based error occurs - +

-#### Consumer.resume +#### BaseConsumer.resume ```python def resume(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L431) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L433) Resume consumption for the provided list of partitions. @@ -609,17 +571,17 @@ Resume consumption for the provided list of partitions. - `KafkaException`: if a Kafka-based error occurs - +

-#### Consumer.position +#### BaseConsumer.position ```python def position(partitions: List[TopicPartition]) -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L441) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L443) Retrieve current positions (offsets) for the specified partitions. @@ -642,17 +604,17 @@ the last consumed message + 1. `List[TopicPartition]`: List of topic+partitions with offset and possibly error set. - +

-#### Consumer.seek +#### BaseConsumer.seek ```python def seek(partition: TopicPartition) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L455) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L457) Set consume position for partition to offset. @@ -674,17 +636,17 @@ pass the offset in an `assign()` call. - `KafkaException`: if a Kafka-based error occurs - +

-#### Consumer.assignment +#### BaseConsumer.assignment ```python def assignment() -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L472) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L474) Returns the current partition assignment. @@ -699,17 +661,17 @@ Returns the current partition assignment. `List[TopicPartition]`: List of assigned topic+partitions. - +

-#### Consumer.set\_sasl\_credentials +#### BaseConsumer.set\_sasl\_credentials ```python def set_sasl_credentials(username: str, password: str) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L485) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L487) Sets the SASL credentials used for this client. @@ -726,17 +688,17 @@ This method is applicable only to SASL PLAIN and SCRAM mechanisms. - `username` (`str`): your username - `password` (`str`): your password - +

-#### Consumer.incremental\_assign +#### BaseConsumer.incremental\_assign ```python def incremental_assign(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L499) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L501) Assign new partitions. @@ -752,17 +714,17 @@ Any additional partitions besides the ones passed during the `Consumer` - `partitions` (`List[TopicPartition]`): a list of topic partitions - +

-#### Consumer.incremental\_unassign +#### BaseConsumer.incremental\_unassign ```python def incremental_unassign(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L513) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L515) Revoke partitions. @@ -774,17 +736,17 @@ Can be called outside an on_revoke callback. - `partitions` (`List[TopicPartition]`): a list of topic partitions - +

-#### Consumer.close +#### BaseConsumer.close ```python def close() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L523) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L525) Close down and terminate the Kafka Consumer. @@ -797,17 +759,17 @@ Actions performed: Registered callbacks may be called from this method, see `poll()` for more info. - +

-#### Consumer.consumer\_group\_metadata +#### BaseConsumer.consumer\_group\_metadata ```python def consumer_group_metadata() -> GroupMetadata ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L540) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L542) Used by the producer during consumer offset sending for an EOS transaction. diff --git a/docs/api-reference/quixstreams.md b/docs/api-reference/quixstreams.md index 6ece99d7a..a9ea689e2 100644 --- a/docs/api-reference/quixstreams.md +++ b/docs/api-reference/quixstreams.md @@ -2494,7 +2494,7 @@ def produce_row(row: Row, timestamp: Optional[int] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowproducer.py#L121) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowproducer.py#L119) Serialize Row to bytes according to the Topic serialization settings @@ -2518,7 +2518,7 @@ If this method fails, it will trigger the provided "on_error" callback. def poll(timeout: float = 0) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowproducer.py#L161) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowproducer.py#L159) Polls the producer for events and calls `on_delivery` callbacks. @@ -2536,7 +2536,7 @@ If `poll()` fails, it will trigger the provided "on_error" callback def abort_transaction(timeout: Optional[float] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowproducer.py#L232) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowproducer.py#L230) Attempt an abort if an active transaction. @@ -3495,6 +3495,43 @@ The serialized batch as bytes in Parquet format. ## quixstreams.sinks.community.file.formats + + +### InvalidFormatError + +```python +class InvalidFormatError(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/formats/__init__.py#L17) + +Raised when the format is specified incorrectly. + + + +#### resolve\_format + +```python +def resolve_format(format: Union[FormatName, Format]) -> Format +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/formats/__init__.py#L23) + +Resolves the format into a `Format` instance. + +**Arguments**: + +- `format`: The format to resolve, either a format name ("json", +"parquet") or a `Format` instance. + +**Raises**: + +- `InvalidFormatError`: If the format name is invalid. + +**Returns**: + +An instance of `Format` corresponding to the specified format. + ## quixstreams.sinks.community.file.formats.json @@ -3665,86 +3702,326 @@ The serialized batch as bytes. ## quixstreams.sinks.community.file.sink - + -### InvalidFormatError +### FileSink ```python -class InvalidFormatError(Exception) +class FileSink(BatchingSink) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L24) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L11) -Raised when the format is specified incorrectly. +A sink that writes data batches to files using configurable formats and +destinations. - +The sink groups messages by their topic and partition, ensuring data from the +same source is stored together. Each batch is serialized using the specified +format (e.g., JSON, Parquet) before being written to the configured +destination. -### FileSink +The destination determines the storage location and write behavior. By default, +it uses LocalDestination for writing to the local filesystem, but can be +configured to use other storage backends (e.g., cloud storage). + + + +#### FileSink.\_\_init\_\_ ```python -class FileSink(BatchingSink) +def __init__(directory: str = "", + format: Union[FormatName, Format] = "json", + destination: Optional[Destination] = None) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L30) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L25) -Writes batches of data to files on disk using specified formats. +Initialize the FileSink with the specified configuration. -Messages are grouped by their topic and partition. Data from messages with -the same topic and partition are saved in the same directory. Each batch of -messages is serialized and saved to a file within that directory. Files are -named using the batch's starting offset to ensure uniqueness and order. +**Arguments**: -If `append` is set to `True`, the sink will attempt to append data to an -existing file rather than creating a new one. This is only supported for -formats that allow appending. +- `directory`: Base directory path for storing files. Defaults to +current directory. +- `format`: Data serialization format, either as a string +("json", "parquet") or a Format instance. +- `destination`: Storage destination handler. Defaults to +LocalDestination if not specified. - + -#### FileSink.\_\_init\_\_ +#### FileSink.write ```python -def __init__(output_dir: str, - format: Union[FormatName, Format], - append: bool = False) -> None +def write(batch: SinkBatch) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L44) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L46) -Initializes the FileSink. +Write a batch of data using the configured format and destination. + +The method performs the following steps: +1. Serializes the batch data using the configured format +2. Writes the serialized data to the destination +3. Handles any write failures by raising a backpressure error **Arguments**: -- `output_dir`: The directory where files will be written. -- `format`: The data serialization format to use. This can be either a -format name ("json", "parquet") or an instance of a `Format` -subclass. -- `append`: If `True`, data will be appended to existing files when possible. -Note that not all formats support appending. Defaults to `False`. +- `batch`: The batch of data to write. **Raises**: -- `ValueError`: If `append` is `True` but the specified format does not +- `SinkBackpressureError`: If the write operation fails, indicating +that the sink needs backpressure with a 5-second retry delay. + + + +## quixstreams.sinks.community.file.destinations.local + + + +### LocalDestination + +```python +class LocalDestination(Destination) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L15) + +A destination that writes data to the local filesystem. + +Handles writing data to local files with support for both creating new files +and appending to existing ones. + + + +#### LocalDestination.\_\_init\_\_ + +```python +def __init__(append: bool = False) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L22) + +Initialize the local destination. + +**Arguments**: + +- `append`: If True, append to existing files instead of creating new +ones. Defaults to False. + + + +#### LocalDestination.set\_extension + +```python +def set_extension(format: Format) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L32) + +Set the file extension and validate append mode compatibility. + +**Arguments**: + +- `format`: The Format instance that defines the file extension. + +**Raises**: + +- `ValueError`: If append mode is enabled but the format doesn't support appending. - + -#### FileSink.write +#### LocalDestination.write ```python -def write(batch: SinkBatch) -> None +def write(data: bytes, batch: SinkBatch) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L43) + +Write data to a local file. + +**Arguments**: + +- `data`: The serialized data to write. +- `batch`: The batch information containing topic and partition details. + + + +## quixstreams.sinks.community.file.destinations + + + +## quixstreams.sinks.community.file.destinations.s3 + + + +### S3BucketNotFoundError + +```python +class S3BucketNotFoundError(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L13) + +Raised when the specified S3 bucket does not exist. + + + +### S3BucketAccessDeniedError + +```python +class S3BucketAccessDeniedError(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L17) + +Raised when the specified S3 bucket access is denied. + + + +### S3Destination + +```python +class S3Destination(Destination) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L21) + +A destination that writes data to Amazon S3. + +Handles writing data to S3 buckets using the AWS SDK. Credentials can be +provided directly or via environment variables. + + + +#### S3Destination.\_\_init\_\_ + +```python +def __init__(bucket: str, + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv( + "AWS_SECRET_ACCESS_KEY"), + region_name: Optional[str] = getenv("AWS_REGION", + getenv("AWS_DEFAULT_REGION")), + **kwargs) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L68) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L28) + +Initialize the S3 destination. + +**Arguments**: + +- `bucket`: Name of the S3 bucket to write to. +- `aws_access_key_id`: AWS access key ID. Defaults to AWS_ACCESS_KEY_ID +environment variable. +- `aws_secret_access_key`: AWS secret access key. Defaults to +AWS_SECRET_ACCESS_KEY environment variable. +- `region_name`: AWS region name. Defaults to AWS_REGION or +AWS_DEFAULT_REGION environment variable. +- `kwargs`: Additional keyword arguments passed to boto3.client. + +**Raises**: + +- `S3BucketNotFoundError`: If the specified bucket doesn't exist. +- `S3BucketAccessDeniedError`: If access to the bucket is denied. + + + +#### S3Destination.write -Writes a batch of data to files on disk, grouping data by topic and partition. +```python +def write(data: bytes, batch: SinkBatch) -> None +``` -If `append` is `True` and an existing file is found, data will be appended to -the last file. Otherwise, a new file is created based on the batch's starting -offset. +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L78) + +Write data to S3. **Arguments**: -- `batch`: The batch of data to write. +- `data`: The serialized data to write. +- `batch`: The batch information containing topic and partition details. + + + +## quixstreams.sinks.community.file.destinations.base + + + +### Destination + +```python +class Destination(ABC) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L16) + +Abstract base class for defining where and how data should be stored. + +Destinations handle the storage of serialized data, whether that's to local +disk, cloud storage, or other locations. They manage the physical writing of +data while maintaining a consistent directory/path structure based on topics +and partitions. + + + +#### Destination.set\_directory + +```python +def set_directory(directory: str) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L28) + +Configure the base directory for storing files. + +**Arguments**: + +- `directory`: The base directory path where files will be stored. + +**Raises**: + +- `ValueError`: If the directory path contains invalid characters. +Only alphanumeric characters (a-zA-Z0-9), spaces, dots, and +underscores are allowed. + + + +#### Destination.set\_extension + +```python +def set_extension(format: Format) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L45) + +Set the file extension based on the format. + +**Arguments**: + +- `format`: The Format instance that defines the file extension. + + + +#### Destination.write + +```python +@abstractmethod +def write(data: bytes, batch: SinkBatch) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L54) + +Write the serialized data to storage. + +**Arguments**: + +- `data`: The serialized data to write. +- `batch`: The batch information containing topic, partition and offset +details. @@ -5496,7 +5773,7 @@ or a ConnectionConfig object if authentication is required. def list_topics(timeout: float = -1) -> Dict[str, ConfluentTopicMetadata] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L88) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L91) Get a list of topics and their metadata from a Kafka cluster @@ -5517,7 +5794,7 @@ def inspect_topics(topic_names: List[str], timeout: float = 30) -> Dict[str, Optional[TopicConfig]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L99) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L102) A simplified way of getting the topic configurations of the provided topics @@ -5543,7 +5820,7 @@ def create_topics(topics: List[Topic], finalize_timeout: float = 60) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L181) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L184) Create the given list of topics and confirm they are ready. @@ -8943,7 +9220,7 @@ Raised from `on_assign`, `on_revoke` and `on_lost` callbacks def set_message_context(context: Optional[MessageContext]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L20) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L22) Set a MessageContext for the current message in the given `contextvars.Context` @@ -8977,10 +9254,10 @@ sdf = sdf.update(lambda value: alter_context(value)) #### message\_context ```python -def message_context() -> MessageContext +def message_context() -> Optional[MessageContext] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L51) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/context.py#L53) Get a MessageContext for the current message, which houses most of the message @@ -9019,7 +9296,7 @@ instance of `MessageContext` class ConnectionConfig(BaseSettings) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L16) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L21) Provides an interface for all librdkafka connection-based configs. @@ -9034,7 +9311,7 @@ Also obscures secrets and handles any case sensitivity issues. ```python @classmethod def settings_customise_sources( - cls, settings_cls: Type[BaseSettings], + cls, settings_cls: Type[PydanticBaseSettings], init_settings: PydanticBaseSettingsSource, env_settings: PydanticBaseSettingsSource, dotenv_settings: PydanticBaseSettingsSource, @@ -9042,7 +9319,7 @@ def settings_customise_sources( ) -> Tuple[PydanticBaseSettingsSource, ...] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L94) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L99) Included to ignore reading/setting values from the environment @@ -9057,7 +9334,7 @@ def from_librdkafka_dict(cls, ignore_extras: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L108) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L113) Create a `ConnectionConfig` from a librdkafka config dictionary. @@ -9078,7 +9355,7 @@ a ConnectionConfig def as_librdkafka_dict(plaintext_secrets: bool = True) -> dict ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L123) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/configuration.py#L128) Dump any non-empty config values as a librdkafka dictionary. @@ -9119,7 +9396,7 @@ def __init__(broker_address: Union[str, ConnectionConfig], logger: logging.Logger = logger, error_callback: Callable[[KafkaError], None] = _default_error_cb, extra_config: Optional[dict] = None, - flush_timeout: Optional[int] = None) + flush_timeout: Optional[float] = None) ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/producer.py#L43) @@ -9236,19 +9513,19 @@ A separate producer class used only internally for transactions ## quixstreams.kafka.consumer - + -### Consumer +### BaseConsumer ```python -class Consumer() +class BaseConsumer() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L67) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L68) - + -#### Consumer.\_\_init\_\_ +#### BaseConsumer.\_\_init\_\_ ```python def __init__(broker_address: Union[str, ConnectionConfig], @@ -9262,7 +9539,7 @@ def __init__(broker_address: Union[str, ConnectionConfig], extra_config: Optional[dict] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L68) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L69) A wrapper around `confluent_kafka.Consumer`. @@ -9293,15 +9570,15 @@ Passed as "offset_commit_cb" to `confluent_kafka.Consumer`. will be passed to `confluent_kafka.Consumer` as is. Note: values passed as arguments override values in `extra_config`. - + -#### Consumer.poll +#### BaseConsumer.poll ```python def poll(timeout: Optional[float] = None) -> Optional[Message] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L131) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L132) Consumes a single message, calls callbacks and returns events. @@ -9325,49 +9602,15 @@ event or callback. None or -1 is infinite. Default: None. `Optional[Message]`: A `Message` object or `None` on timeout - - -#### Consumer.subscribe - -```python -def subscribe(topics: List[str], - on_assign: Optional[RebalancingCallback] = None, - on_revoke: Optional[RebalancingCallback] = None, - on_lost: Optional[RebalancingCallback] = None) -``` - -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L150) - -Set subscription to supplied list of topics - -This replaces a previous subscription. - -**Arguments**: - -- `topics` (`List[str]`): List of topics (strings) to subscribe to. -- `on_assign` (`Optional[RebalancingCallback]`): callback to provide handling of -customized offsets on completion of a successful partition re-assignment. -- `on_revoke` (`Optional[RebalancingCallback]`): callback to provide handling of -offset commits to a customized store on the start of a rebalance operation. -- `on_lost` (`Optional[RebalancingCallback]`): callback to provide handling in -the case the partition assignment has been lost. Partitions that have been -lost may already be owned by other members in the group and therefore -committing offsets, for example, may fail. - -**Raises**: - -- `KafkaException`: if a Kafka-based error occurs -- `RuntimeError`: if called on a closed consumer - - + -#### Consumer.unsubscribe +#### BaseConsumer.unsubscribe ```python def unsubscribe() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L234) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L235) Remove current subscription. @@ -9376,16 +9619,16 @@ Remove current subscription. - `KafkaException`: if a Kafka-based error occurs - `RuntimeError`: if called on a closed consumer - + -#### Consumer.store\_offsets +#### BaseConsumer.store\_offsets ```python def store_offsets(message: Optional[Message] = None, offsets: Optional[List[TopicPartition]] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L243) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L244) Store offsets for a message or a list of offsets. @@ -9404,9 +9647,9 @@ Note that 'enable.auto.offset.store' must be set to False when using this API. - `KafkaException`: if a Kafka-based error occurs - `RuntimeError`: if called on a closed consumer - + -#### Consumer.commit +#### BaseConsumer.commit ```python def commit(message: Optional[Message] = None, @@ -9414,7 +9657,7 @@ def commit(message: Optional[Message] = None, asynchronous: bool = True) -> Optional[List[TopicPartition]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L274) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L275) Commit a message or a list of offsets. @@ -9439,16 +9682,16 @@ each partition should be checked for success. - `KafkaException`: if a Kafka-based error occurs - `RuntimeError`: if called on a closed consumer - + -#### Consumer.committed +#### BaseConsumer.committed ```python def committed(partitions: List[TopicPartition], timeout: Optional[float] = None) -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L314) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L316) Retrieve committed offsets for the specified partitions. @@ -9467,9 +9710,9 @@ None or -1 is infinite. Default: None `List[TopicPartition]`: List of topic+partitions with offset and possibly error set. - + -#### Consumer.get\_watermark\_offsets +#### BaseConsumer.get\_watermark\_offsets ```python def get_watermark_offsets(partition: TopicPartition, @@ -9477,7 +9720,7 @@ def get_watermark_offsets(partition: TopicPartition, cached: bool = False) -> Tuple[int, int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L332) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L334) Retrieve low and high offsets for the specified partition. @@ -9501,16 +9744,16 @@ message fetched from the broker for this partition. `Tuple[int, int]`: Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1. - + -#### Consumer.list\_topics +#### BaseConsumer.list\_topics ```python def list_topics(topic: Optional[str] = None, timeout: Optional[float] = None) -> ClusterMetadata ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L358) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L360) Request metadata from the cluster. @@ -9530,15 +9773,15 @@ None or -1 is infinite. Default: None - `KafkaException`: if a Kafka-based error occurs - + -#### Consumer.memberid +#### BaseConsumer.memberid ```python def memberid() -> Optional[str] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L379) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L381) Return this client's broker-assigned group member id. @@ -9553,16 +9796,16 @@ the consumer during rebalance. `Optional[string]`: Member id string or None - + -#### Consumer.offsets\_for\_times +#### BaseConsumer.offsets\_for\_times ```python def offsets_for_times(partitions: List[TopicPartition], timeout: Optional[float] = None) -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L392) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L394) Look up offsets by timestamp for the specified partitions. @@ -9587,15 +9830,15 @@ None or -1 is infinite. Default: None `List[TopicPartition]`: List of topic+partition with offset field set and possibly error set - + -#### Consumer.pause +#### BaseConsumer.pause ```python def pause(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L418) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L420) Pause consumption for the provided list of partitions. @@ -9611,15 +9854,15 @@ Does NOT affect the result of `Consumer.assignment()`. - `KafkaException`: if a Kafka-based error occurs - + -#### Consumer.resume +#### BaseConsumer.resume ```python def resume(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L431) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L433) Resume consumption for the provided list of partitions. @@ -9631,15 +9874,15 @@ Resume consumption for the provided list of partitions. - `KafkaException`: if a Kafka-based error occurs - + -#### Consumer.position +#### BaseConsumer.position ```python def position(partitions: List[TopicPartition]) -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L441) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L443) Retrieve current positions (offsets) for the specified partitions. @@ -9658,15 +9901,15 @@ the last consumed message + 1. `List[TopicPartition]`: List of topic+partitions with offset and possibly error set. - + -#### Consumer.seek +#### BaseConsumer.seek ```python def seek(partition: TopicPartition) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L455) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L457) Set consume position for partition to offset. @@ -9686,15 +9929,15 @@ pass the offset in an `assign()` call. - `KafkaException`: if a Kafka-based error occurs - + -#### Consumer.assignment +#### BaseConsumer.assignment ```python def assignment() -> List[TopicPartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L472) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L474) Returns the current partition assignment. @@ -9707,15 +9950,15 @@ Returns the current partition assignment. `List[TopicPartition]`: List of assigned topic+partitions. - + -#### Consumer.set\_sasl\_credentials +#### BaseConsumer.set\_sasl\_credentials ```python def set_sasl_credentials(username: str, password: str) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L485) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L487) Sets the SASL credentials used for this client. @@ -9730,15 +9973,15 @@ This method is applicable only to SASL PLAIN and SCRAM mechanisms. - `username` (`str`): your username - `password` (`str`): your password - + -#### Consumer.incremental\_assign +#### BaseConsumer.incremental\_assign ```python def incremental_assign(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L499) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L501) Assign new partitions. @@ -9752,15 +9995,15 @@ Any additional partitions besides the ones passed during the `Consumer` - `partitions` (`List[TopicPartition]`): a list of topic partitions - + -#### Consumer.incremental\_unassign +#### BaseConsumer.incremental\_unassign ```python def incremental_unassign(partitions: List[TopicPartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L513) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L515) Revoke partitions. @@ -9770,15 +10013,15 @@ Can be called outside an on_revoke callback. - `partitions` (`List[TopicPartition]`): a list of topic partitions - + -#### Consumer.close +#### BaseConsumer.close ```python def close() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L523) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L525) Close down and terminate the Kafka Consumer. @@ -9791,15 +10034,15 @@ Actions performed: Registered callbacks may be called from this method, see `poll()` for more info. - + -#### Consumer.consumer\_group\_metadata +#### BaseConsumer.consumer\_group\_metadata ```python def consumer_group_metadata() -> GroupMetadata ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L540) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/kafka/consumer.py#L542) Used by the producer during consumer offset sending for an EOS transaction. @@ -9819,7 +10062,7 @@ Used by the producer during consumer offset sending for an EOS transaction. class Application() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L63) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L75) The main Application class. @@ -9888,7 +10131,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None, processing_guarantee: ProcessingGuarantee = "at-least-once") ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L101) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L113) **Arguments**: @@ -9973,7 +10216,7 @@ instead of the default one. def Quix(cls, *args, **kwargs) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L339) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L352) RAISES EXCEPTION: DEPRECATED. @@ -9994,7 +10237,7 @@ def topic(name: str, timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L371) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L384) Create a topic definition. @@ -10066,7 +10309,7 @@ def dataframe(topic: Optional[Topic] = None, source: Optional[BaseSource] = None) -> StreamingDataFrame ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L451) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L464) A simple helper method that generates a `StreamingDataFrame`, which is used @@ -10114,7 +10357,7 @@ to be used as an input topic. def stop(fail: bool = False) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L506) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L520) Stop the internal poll loop and the message processing. @@ -10137,7 +10380,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint. def get_producer() -> Producer ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L551) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L565) Create and return a pre-configured Producer instance. The Producer is initialized with params passed to Application. @@ -10168,7 +10411,7 @@ with app.get_producer() as producer: def get_consumer(auto_commit_enable: bool = True) -> Consumer ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L599) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L613) Create and return a pre-configured Consumer instance. @@ -10219,7 +10462,7 @@ Default - True def clear_state() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L649) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L663) Clear the state of the application. @@ -10231,7 +10474,7 @@ Clear the state of the application. def add_source(source: BaseSource, topic: Optional[Topic] = None) -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L655) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L669) Add a source to the application. @@ -10253,7 +10496,7 @@ Default: the source default def run(dataframe: Optional[StreamingDataFrame] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L685) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L699) Start processing data from Kafka using provided `StreamingDataFrame` @@ -10285,7 +10528,7 @@ app.run() def setup_topics() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L808) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L822) Validate and create the topics @@ -10297,7 +10540,7 @@ Validate and create the topics class ApplicationConfig(BaseSettings) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L984) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L998) Immutable object holding the application configuration @@ -10310,7 +10553,7 @@ For details see :class:`quixstreams.Application` ```python @classmethod def settings_customise_sources( - cls, settings_cls: Type[BaseSettings], + cls, settings_cls: Type[PydanticBaseSettings], init_settings: PydanticBaseSettingsSource, env_settings: PydanticBaseSettingsSource, dotenv_settings: PydanticBaseSettingsSource, @@ -10318,7 +10561,7 @@ def settings_customise_sources( ) -> Tuple[PydanticBaseSettingsSource, ...] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1019) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1033) Included to ignore reading/setting values from the environment @@ -10330,7 +10573,7 @@ Included to ignore reading/setting values from the environment def copy(**kwargs) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1032) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/app.py#L1046) Update the application config and return a copy @@ -10433,18 +10676,18 @@ app.run() #### KafkaReplicatorSource.\_\_init\_\_ ```python -def __init__(name: str, - app_config: "ApplicationConfig", - topic: str, - broker_address: Union[str, ConnectionConfig], - auto_offset_reset: AutoOffsetReset = "latest", - consumer_extra_config: Optional[dict] = None, - consumer_poll_timeout: Optional[float] = None, - shutdown_timeout: float = 10, - on_consumer_error: Optional[ - ConsumerErrorCallback] = default_on_consumer_error, - value_deserializer: DeserializerType = "json", - key_deserializer: DeserializerType = "bytes") -> None +def __init__( + name: str, + app_config: "ApplicationConfig", + topic: str, + broker_address: Union[str, ConnectionConfig], + auto_offset_reset: Optional[AutoOffsetReset] = "latest", + consumer_extra_config: Optional[dict] = None, + consumer_poll_timeout: Optional[float] = None, + shutdown_timeout: float = 10, + on_consumer_error: ConsumerErrorCallback = default_on_consumer_error, + value_deserializer: DeserializerType = "json", + key_deserializer: DeserializerType = "bytes") -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/core/kafka/kafka.py#L54) @@ -10519,20 +10762,20 @@ app.run() #### QuixEnvironmentSource.\_\_init\_\_ ```python -def __init__(name: str, - app_config: "ApplicationConfig", - topic: str, - quix_sdk_token: str, - quix_workspace_id: str, - quix_portal_api: Optional[str] = None, - auto_offset_reset: Optional[AutoOffsetReset] = None, - consumer_extra_config: Optional[dict] = None, - consumer_poll_timeout: Optional[float] = None, - shutdown_timeout: float = 10, - on_consumer_error: Optional[ - ConsumerErrorCallback] = default_on_consumer_error, - value_deserializer: DeserializerType = "json", - key_deserializer: DeserializerType = "bytes") -> None +def __init__( + name: str, + app_config: "ApplicationConfig", + topic: str, + quix_sdk_token: str, + quix_workspace_id: str, + quix_portal_api: Optional[str] = None, + auto_offset_reset: Optional[AutoOffsetReset] = None, + consumer_extra_config: Optional[dict] = None, + consumer_poll_timeout: Optional[float] = None, + shutdown_timeout: float = 10, + on_consumer_error: ConsumerErrorCallback = default_on_consumer_error, + value_deserializer: DeserializerType = "json", + key_deserializer: DeserializerType = "bytes") -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/core/kafka/quix.py#L50) @@ -10567,7 +10810,8 @@ class CSVSource(Source) ```python def __init__(path: Union[str, Path], name: str, - key_extractor: Optional[Callable[[dict], AnyStr]] = None, + key_extractor: Optional[Callable[[dict], Union[str, + bytes]]] = None, timestamp_extractor: Optional[Callable[[dict], int]] = None, delay: float = 0, shutdown_timeout: float = 10, @@ -11274,7 +11518,7 @@ It configures the source's Kafka producer, the topic it will produce to and opti def start() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L102) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L110) This method is triggered in the subprocess when the source is started. @@ -11290,7 +11534,7 @@ Use it to fetch data and produce it to Kafka. def stop() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L111) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L119) This method is triggered when the application is shutting down. @@ -11305,7 +11549,7 @@ The source must ensure that the `run` method is completed soon. def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L119) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L127) This method is triggered when the topic is not provided to the source. @@ -11319,7 +11563,7 @@ The source must return a default topic configuration. class Source(BaseSource) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L127) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L135) A base class for custom Sources that provides a basic implementation of `BaseSource` interface. @@ -11377,7 +11621,7 @@ if __name__ == "__main__": def __init__(name: str, shutdown_timeout: float = 10) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L177) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L185) **Arguments**: @@ -11393,7 +11637,7 @@ def __init__(name: str, shutdown_timeout: float = 10) -> None def running() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L191) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L199) Property indicating if the source is running. @@ -11407,7 +11651,7 @@ The `stop` method will set it to `False`. Use it to stop the source gracefully. def cleanup(failed: bool) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L199) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L207) This method is triggered once the `run` method completes. @@ -11423,7 +11667,7 @@ It flushes the producer when `_run` completes successfully. def stop() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L210) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L218) This method is triggered when the application is shutting down. @@ -11434,10 +11678,10 @@ It sets the `running` property to `False`. #### Source.start ```python -def start() +def start() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L219) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L226) This method is triggered in the subprocess when the source is started. @@ -11452,7 +11696,7 @@ It marks the source as running, execute it's run method and ensure cleanup happe def run() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L235) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L242) This method is triggered in the subprocess when the source is started. @@ -11470,7 +11714,7 @@ def serialize(key: Optional[object] = None, timestamp_ms: Optional[int] = None) -> KafkaMessage ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L243) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L250) Serialize data to bytes using the producer topic serializers and return a `quixstreams.models.messages.KafkaMessage`. @@ -11492,7 +11736,7 @@ def produce(value: Optional[Union[str, bytes]] = None, buffer_error_max_tries: int = 3) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L259) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L266) Produce a message to the configured source topic in Kafka. @@ -11504,7 +11748,7 @@ Produce a message to the configured source topic in Kafka. def flush(timeout: Optional[float] = None) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L284) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L291) This method flush the producer. @@ -11527,7 +11771,7 @@ None use producer default or -1 is infinite. Default: None def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L302) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L309) Return a default topic matching the source name. @@ -11545,7 +11789,7 @@ The default topic will not be used if the topic has already been provided to the class StatefulSource(Source) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L319) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L326) A `Source` class for custom Sources that need a state. @@ -11603,7 +11847,7 @@ if __name__ == "__main__": def __init__(name: str, shutdown_timeout: float = 10) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L369) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L376) **Arguments**: @@ -11622,7 +11866,7 @@ def configure(topic: Topic, **kwargs) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L379) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L386) This method is triggered before the source is started. @@ -11637,7 +11881,7 @@ It configures the source's Kafka producer, the topic it will produce to and the def store_partitions_count() -> int ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L398) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L405) Count of store partitions. @@ -11652,7 +11896,7 @@ Used to configure the number of partition in the changelog topic. def assigned_store_partition() -> int ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L407) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L414) The store partition assigned to this instance @@ -11665,7 +11909,7 @@ The store partition assigned to this instance def store_name() -> str ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L414) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L421) The source store name @@ -11678,7 +11922,7 @@ The source store name def state() -> State ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L421) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L428) Access the `State` of the source. @@ -11694,7 +11938,7 @@ Important: after each `.flush()` call, a previously returned instance is invalid def flush(timeout: Optional[float] = None) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L440) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L447) This method commit the state and flush the producer. @@ -11718,10 +11962,10 @@ None use producer default or -1 is infinite. Default: None ### SourceProcess ```python -class SourceProcess(multiprocessing.Process) +class SourceProcess(process) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L24) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L30) An implementation of the Source subprocess. @@ -11738,7 +11982,7 @@ Some methods are designed to be used from the parent process, and others from th def run() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L74) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L80) An entrypoint of the child process. @@ -11755,7 +11999,7 @@ Responsible for: def raise_for_error() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L189) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L196) Raise a `quixstreams.sources.manager.SourceException` if the child process was terminated with an exception. @@ -11768,7 +12012,7 @@ if the child process was terminated with an exception. def stop() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L213) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L220) Handle shutdown of the source and its subprocess. @@ -11784,7 +12028,7 @@ is still alive, it will kill it with a SIGKILL. class SourceManager() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L236) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L243) Class managing the sources registered with the app @@ -11799,7 +12043,7 @@ def register(source: BaseSource, topic, producer, consumer, topic_manager) -> SourceProcess ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L246) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L253) Register a new source in the manager. @@ -11813,7 +12057,7 @@ Each source need to already be configured, can't reuse a topic and must be uniqu def raise_for_error() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L297) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L304) Raise an exception if any process has stopped with an exception @@ -11825,7 +12069,7 @@ Raise an exception if any process has stopped with an exception def is_alive() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L304) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/manager.py#L311) Check if any process is alive @@ -11846,7 +12090,7 @@ True if at least one process is alive ### RowConsumer ```python -class RowConsumer(Consumer) +class RowConsumer(BaseConsumer) ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowconsumer.py#L19) @@ -11860,8 +12104,8 @@ def __init__(broker_address: Union[str, ConnectionConfig], consumer_group: str, auto_offset_reset: AutoOffsetReset, auto_commit_enable: bool = True, - on_commit: Callable[[Optional[KafkaError], List[TopicPartition]], - None] = None, + on_commit: Optional[Callable[ + [Optional[KafkaError], List[TopicPartition]], None]] = None, extra_config: Optional[dict] = None, on_error: Optional[ConsumerErrorCallback] = None) ``` @@ -11935,7 +12179,7 @@ for example, may fail. #### RowConsumer.poll\_row ```python -def poll_row(timeout: float = None) -> Union[Row, List[Row], None] +def poll_row(timeout: Optional[float] = None) -> Union[Row, List[Row], None] ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/rowconsumer.py#L106) diff --git a/docs/api-reference/sinks.md b/docs/api-reference/sinks.md index ef7bfd98c..af8618b86 100644 --- a/docs/api-reference/sinks.md +++ b/docs/api-reference/sinks.md @@ -596,94 +596,362 @@ to the client's default retrying policy. ## quixstreams.sinks.community.file.sink - + -### InvalidFormatError +### FileSink ```python -class InvalidFormatError(Exception) +class FileSink(BatchingSink) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L24) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L11) -Raised when the format is specified incorrectly. +A sink that writes data batches to files using configurable formats and +destinations. - +The sink groups messages by their topic and partition, ensuring data from the +same source is stored together. Each batch is serialized using the specified +format (e.g., JSON, Parquet) before being written to the configured +destination. -### FileSink +The destination determines the storage location and write behavior. By default, +it uses LocalDestination for writing to the local filesystem, but can be +configured to use other storage backends (e.g., cloud storage). + + + +

+ +#### FileSink.\_\_init\_\_ ```python -class FileSink(BatchingSink) +def __init__(directory: str = "", + format: Union[FormatName, Format] = "json", + destination: Optional[Destination] = None) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L30) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L25) -Writes batches of data to files on disk using specified formats. +Initialize the FileSink with the specified configuration. -Messages are grouped by their topic and partition. Data from messages with -the same topic and partition are saved in the same directory. Each batch of -messages is serialized and saved to a file within that directory. Files are -named using the batch's starting offset to ensure uniqueness and order. -If `append` is set to `True`, the sink will attempt to append data to an -existing file rather than creating a new one. This is only supported for -formats that allow appending. +
+***Arguments:*** - +- `directory`: Base directory path for storing files. Defaults to +current directory. +- `format`: Data serialization format, either as a string +("json", "parquet") or a Format instance. +- `destination`: Storage destination handler. Defaults to +LocalDestination if not specified. + +

-#### FileSink.\_\_init\_\_ +#### FileSink.write + +```python +def write(batch: SinkBatch) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L46) + +Write a batch of data using the configured format and destination. + +The method performs the following steps: +1. Serializes the batch data using the configured format +2. Writes the serialized data to the destination +3. Handles any write failures by raising a backpressure error + + +
+***Arguments:*** + +- `batch`: The batch of data to write. + +**Raises**: + +- `SinkBackpressureError`: If the write operation fails, indicating +that the sink needs backpressure with a 5-second retry delay. + + + +## quixstreams.sinks.community.file.destinations.base + + + +### Destination + +```python +class Destination(ABC) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L16) + +Abstract base class for defining where and how data should be stored. + +Destinations handle the storage of serialized data, whether that's to local +disk, cloud storage, or other locations. They manage the physical writing of +data while maintaining a consistent directory/path structure based on topics +and partitions. + + + +

+ +#### Destination.set\_directory + +```python +def set_directory(directory: str) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L28) + +Configure the base directory for storing files. + + +
+***Arguments:*** + +- `directory`: The base directory path where files will be stored. + +**Raises**: + +- `ValueError`: If the directory path contains invalid characters. +Only alphanumeric characters (a-zA-Z0-9), spaces, dots, and +underscores are allowed. + + + +

+ +#### Destination.set\_extension + +```python +def set_extension(format: Format) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L45) + +Set the file extension based on the format. + + +
+***Arguments:*** + +- `format`: The Format instance that defines the file extension. + + + +

+ +#### Destination.write + +```python +@abstractmethod +def write(data: bytes, batch: SinkBatch) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/base.py#L54) + +Write the serialized data to storage. + + +
+***Arguments:*** + +- `data`: The serialized data to write. +- `batch`: The batch information containing topic, partition and offset +details. + + + +## quixstreams.sinks.community.file.destinations.local + + + +### LocalDestination + +```python +class LocalDestination(Destination) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L15) + +A destination that writes data to the local filesystem. + +Handles writing data to local files with support for both creating new files +and appending to existing ones. + + + +

+ +#### LocalDestination.\_\_init\_\_ + +```python +def __init__(append: bool = False) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L22) + +Initialize the local destination. + + +
+***Arguments:*** + +- `append`: If True, append to existing files instead of creating new +ones. Defaults to False. + + + +

+ +#### LocalDestination.set\_extension ```python -def __init__(output_dir: str, - format: Union[FormatName, Format], - append: bool = False) -> None +def set_extension(format: Format) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L44) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L32) -Initializes the FileSink. +Set the file extension and validate append mode compatibility.
***Arguments:*** -- `output_dir`: The directory where files will be written. -- `format`: The data serialization format to use. This can be either a -format name ("json", "parquet") or an instance of a `Format` -subclass. -- `append`: If `True`, data will be appended to existing files when possible. -Note that not all formats support appending. Defaults to `False`. +- `format`: The Format instance that defines the file extension. **Raises**: -- `ValueError`: If `append` is `True` but the specified format does not +- `ValueError`: If append mode is enabled but the format doesn't support appending. - +

-#### FileSink.write +#### LocalDestination.write ```python -def write(batch: SinkBatch) -> None +def write(data: bytes, batch: SinkBatch) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/local.py#L43) + +Write data to a local file. + + +
+***Arguments:*** + +- `data`: The serialized data to write. +- `batch`: The batch information containing topic and partition details. + + + +## quixstreams.sinks.community.file.destinations.s3 + + + +### S3BucketNotFoundError + +```python +class S3BucketNotFoundError(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L13) + +Raised when the specified S3 bucket does not exist. + + + +### S3BucketAccessDeniedError + +```python +class S3BucketAccessDeniedError(Exception) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L17) + +Raised when the specified S3 bucket access is denied. + + + +### S3Destination + +```python +class S3Destination(Destination) +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L21) + +A destination that writes data to Amazon S3. + +Handles writing data to S3 buckets using the AWS SDK. Credentials can be +provided directly or via environment variables. + + + +

+ +#### S3Destination.\_\_init\_\_ + +```python +def __init__(bucket: str, + aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key: Optional[str] = getenv( + "AWS_SECRET_ACCESS_KEY"), + region_name: Optional[str] = getenv("AWS_REGION", + getenv("AWS_DEFAULT_REGION")), + **kwargs) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/sink.py#L68) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L28) + +Initialize the S3 destination. -Writes a batch of data to files on disk, grouping data by topic and partition. -If `append` is `True` and an existing file is found, data will be appended to -the last file. Otherwise, a new file is created based on the batch's starting -offset. +
+***Arguments:*** + +- `bucket`: Name of the S3 bucket to write to. +- `aws_access_key_id`: AWS access key ID. Defaults to AWS_ACCESS_KEY_ID +environment variable. +- `aws_secret_access_key`: AWS secret access key. Defaults to +AWS_SECRET_ACCESS_KEY environment variable. +- `region_name`: AWS region name. Defaults to AWS_REGION or +AWS_DEFAULT_REGION environment variable. +- `kwargs`: Additional keyword arguments passed to boto3.client. + +**Raises**: + +- `S3BucketNotFoundError`: If the specified bucket doesn't exist. +- `S3BucketAccessDeniedError`: If access to the bucket is denied. + + + +

+ +#### S3Destination.write + +```python +def write(data: bytes, batch: SinkBatch) -> None +``` + +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/file/destinations/s3.py#L78) + +Write data to S3.
***Arguments:*** -- `batch`: The batch of data to write. +- `data`: The serialized data to write. +- `batch`: The batch information containing topic and partition details. diff --git a/docs/api-reference/sources.md b/docs/api-reference/sources.md index c0a3dd47f..2ae4b3626 100644 --- a/docs/api-reference/sources.md +++ b/docs/api-reference/sources.md @@ -100,7 +100,7 @@ It configures the source's Kafka producer, the topic it will produce to and opti def start() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L102) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L110) This method is triggered in the subprocess when the source is started. @@ -118,7 +118,7 @@ Use it to fetch data and produce it to Kafka. def stop() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L111) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L119) This method is triggered when the application is shutting down. @@ -135,7 +135,7 @@ The source must ensure that the `run` method is completed soon. def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L119) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L127) This method is triggered when the topic is not provided to the source. @@ -149,7 +149,7 @@ The source must return a default topic configuration. class Source(BaseSource) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L127) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L135) A base class for custom Sources that provides a basic implementation of `BaseSource` interface. @@ -209,7 +209,7 @@ if __name__ == "__main__": def __init__(name: str, shutdown_timeout: float = 10) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L177) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L185)
@@ -229,7 +229,7 @@ def __init__(name: str, shutdown_timeout: float = 10) -> None def running() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L191) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L199) Property indicating if the source is running. @@ -245,7 +245,7 @@ The `stop` method will set it to `False`. Use it to stop the source gracefully. def cleanup(failed: bool) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L199) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L207) This method is triggered once the `run` method completes. @@ -263,7 +263,7 @@ It flushes the producer when `_run` completes successfully. def stop() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L210) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L218) This method is triggered when the application is shutting down. @@ -276,10 +276,10 @@ It sets the `running` property to `False`. #### Source.start ```python -def start() +def start() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L219) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L226) This method is triggered in the subprocess when the source is started. @@ -296,7 +296,7 @@ It marks the source as running, execute it's run method and ensure cleanup happe def run() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L235) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L242) This method is triggered in the subprocess when the source is started. @@ -316,7 +316,7 @@ def serialize(key: Optional[object] = None, timestamp_ms: Optional[int] = None) -> KafkaMessage ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L243) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L250) Serialize data to bytes using the producer topic serializers and return a `quixstreams.models.messages.KafkaMessage`. @@ -342,7 +342,7 @@ def produce(value: Optional[Union[str, bytes]] = None, buffer_error_max_tries: int = 3) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L259) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L266) Produce a message to the configured source topic in Kafka. @@ -356,7 +356,7 @@ Produce a message to the configured source topic in Kafka. def flush(timeout: Optional[float] = None) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L284) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L291) This method flush the producer. @@ -383,7 +383,7 @@ None use producer default or -1 is infinite. Default: None def default_topic() -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L302) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L309) Return a default topic matching the source name. @@ -403,7 +403,7 @@ The default topic will not be used if the topic has already been provided to the class StatefulSource(Source) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L319) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L326) A `Source` class for custom Sources that need a state. @@ -463,7 +463,7 @@ if __name__ == "__main__": def __init__(name: str, shutdown_timeout: float = 10) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L369) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L376)
@@ -486,7 +486,7 @@ def configure(topic: Topic, **kwargs) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L379) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L386) This method is triggered before the source is started. @@ -503,7 +503,7 @@ It configures the source's Kafka producer, the topic it will produce to and the def store_partitions_count() -> int ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L398) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L405) Count of store partitions. @@ -520,7 +520,7 @@ Used to configure the number of partition in the changelog topic. def assigned_store_partition() -> int ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L407) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L414) The store partition assigned to this instance @@ -535,7 +535,7 @@ The store partition assigned to this instance def store_name() -> str ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L414) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L421) The source store name @@ -550,7 +550,7 @@ The source store name def state() -> State ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L421) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L428) Access the `State` of the source. @@ -568,7 +568,7 @@ Important: after each `.flush()` call, a previously returned instance is invalid def flush(timeout: Optional[float] = None) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L440) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/base/source.py#L447) This method commit the state and flush the producer. @@ -608,7 +608,8 @@ class CSVSource(Source) ```python def __init__(path: Union[str, Path], name: str, - key_extractor: Optional[Callable[[dict], AnyStr]] = None, + key_extractor: Optional[Callable[[dict], Union[str, + bytes]]] = None, timestamp_extractor: Optional[Callable[[dict], int]] = None, delay: float = 0, shutdown_timeout: float = 10, @@ -692,18 +693,18 @@ app.run() #### KafkaReplicatorSource.\_\_init\_\_ ```python -def __init__(name: str, - app_config: "ApplicationConfig", - topic: str, - broker_address: Union[str, ConnectionConfig], - auto_offset_reset: AutoOffsetReset = "latest", - consumer_extra_config: Optional[dict] = None, - consumer_poll_timeout: Optional[float] = None, - shutdown_timeout: float = 10, - on_consumer_error: Optional[ - ConsumerErrorCallback] = default_on_consumer_error, - value_deserializer: DeserializerType = "json", - key_deserializer: DeserializerType = "bytes") -> None +def __init__( + name: str, + app_config: "ApplicationConfig", + topic: str, + broker_address: Union[str, ConnectionConfig], + auto_offset_reset: Optional[AutoOffsetReset] = "latest", + consumer_extra_config: Optional[dict] = None, + consumer_poll_timeout: Optional[float] = None, + shutdown_timeout: float = 10, + on_consumer_error: ConsumerErrorCallback = default_on_consumer_error, + value_deserializer: DeserializerType = "json", + key_deserializer: DeserializerType = "bytes") -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/core/kafka/kafka.py#L54) @@ -784,20 +785,20 @@ app.run() #### QuixEnvironmentSource.\_\_init\_\_ ```python -def __init__(name: str, - app_config: "ApplicationConfig", - topic: str, - quix_sdk_token: str, - quix_workspace_id: str, - quix_portal_api: Optional[str] = None, - auto_offset_reset: Optional[AutoOffsetReset] = None, - consumer_extra_config: Optional[dict] = None, - consumer_poll_timeout: Optional[float] = None, - shutdown_timeout: float = 10, - on_consumer_error: Optional[ - ConsumerErrorCallback] = default_on_consumer_error, - value_deserializer: DeserializerType = "json", - key_deserializer: DeserializerType = "bytes") -> None +def __init__( + name: str, + app_config: "ApplicationConfig", + topic: str, + quix_sdk_token: str, + quix_workspace_id: str, + quix_portal_api: Optional[str] = None, + auto_offset_reset: Optional[AutoOffsetReset] = None, + consumer_extra_config: Optional[dict] = None, + consumer_poll_timeout: Optional[float] = None, + shutdown_timeout: float = 10, + on_consumer_error: ConsumerErrorCallback = default_on_consumer_error, + value_deserializer: DeserializerType = "json", + key_deserializer: DeserializerType = "bytes") -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sources/core/kafka/quix.py#L50) diff --git a/docs/api-reference/topics.md b/docs/api-reference/topics.md index 821ac7284..201671c52 100644 --- a/docs/api-reference/topics.md +++ b/docs/api-reference/topics.md @@ -82,7 +82,7 @@ or a ConnectionConfig object if authentication is required. def list_topics(timeout: float = -1) -> Dict[str, ConfluentTopicMetadata] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L88) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L91) Get a list of topics and their metadata from a Kafka cluster @@ -109,7 +109,7 @@ def inspect_topics(topic_names: List[str], timeout: float = 30) -> Dict[str, Optional[TopicConfig]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L99) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L102) A simplified way of getting the topic configurations of the provided topics @@ -141,7 +141,7 @@ def create_topics(topics: List[Topic], finalize_timeout: float = 60) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L181) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/admin.py#L184) Create the given list of topics and confirm they are ready.