diff --git a/docs/api-reference/dataframe.md b/docs/api-reference/dataframe.md index e59c21d4b..3655dc0b4 100644 --- a/docs/api-reference/dataframe.md +++ b/docs/api-reference/dataframe.md @@ -10,7 +10,7 @@ class StreamingDataFrame(BaseStreaming) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L69) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L68) `StreamingDataFrame` is the main object you will use for ETL work. @@ -81,7 +81,7 @@ def apply(func: Union[ metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L175) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L174) Apply a function to transform the value and return a new value. @@ -139,7 +139,7 @@ def update(func: Union[ metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L264) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L263) Apply a function to mutate value in-place or to perform a side effect @@ -207,7 +207,7 @@ def filter(func: Union[ metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L356) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L355) Filter value using provided function. @@ -259,7 +259,7 @@ def group_by(key: Union[str, Callable[[Any], Any]], key_serializer: Optional[SerializerType] = "json") -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L442) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L441) "Groups" messages by re-keying them via the provided group_by operation @@ -323,7 +323,7 @@ a clone with this operation added (assign to keep its effect). def contains(key: str) -> StreamingSeries ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L515) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L514) Check if the key is present in the Row value. @@ -362,7 +362,7 @@ or False otherwise. def to_topic(topic: Topic, key: Optional[Callable[[Any], Any]] = None) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L540) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L539) Produce current value to a topic. You can optionally specify a new key. @@ -415,7 +415,7 @@ the updated StreamingDataFrame instance (reassignment NOT required). def set_timestamp(func: Callable[[Any, Any, int, Any], int]) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L585) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L584) Set a new timestamp based on the current message value and its metadata. @@ -461,14 +461,13 @@ a new StreamingDataFrame instance ```python def set_headers( - func: Callable[ - [Any, Any, int, List[Tuple[str, HeaderValue]]], - Collection[Tuple[str, HeaderValue]], - ] -) -> Self + func: Callable[ + [Any, Any, int, HeadersTuples], + HeadersTuples, + ]) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L626) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L625) Set new message headers based on the current message value and metadata. @@ -517,7 +516,7 @@ a new StreamingDataFrame instance def print(pretty: bool = True, metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L677) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L676) Print out the current message value (and optionally, the message metadata) to @@ -573,7 +572,7 @@ def compose( ) -> Dict[str, VoidExecutor] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L719) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L718) Compose all functions of this StreamingDataFrame into one big closure. @@ -627,7 +626,7 @@ def test(value: Any, topic: Optional[Topic] = None) -> List[Any] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L753) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L752) A shorthand to test `StreamingDataFrame` with provided value @@ -664,7 +663,7 @@ def tumbling_window(duration_ms: Union[int, timedelta], name: Optional[str] = None) -> TumblingWindowDefinition ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L790) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L789) Create a tumbling window transformation on this StreamingDataFrame. @@ -750,7 +749,7 @@ def hopping_window(duration_ms: Union[int, timedelta], name: Optional[str] = None) -> HoppingWindowDefinition ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L866) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L865) Create a hopping window transformation on this StreamingDataFrame. @@ -844,7 +843,7 @@ def sliding_window(duration_ms: Union[int, timedelta], name: Optional[str] = None) -> SlidingWindowDefinition ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L958) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L957) Create a sliding window transformation on this StreamingDataFrame. @@ -933,7 +932,7 @@ def drop(columns: Union[str, List[str]], errors: Literal["ignore", "raise"] = "raise") -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1039) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1038) Drop column(s) from the message value (value must support `del`, like a dict). @@ -977,7 +976,7 @@ a new StreamingDataFrame instance def sink(sink: BaseSink) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1083) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1082) Sink the processed data to the specified destination. diff --git a/docs/api-reference/quixstreams.md b/docs/api-reference/quixstreams.md index 1b69f0436..6826ada4b 100644 --- a/docs/api-reference/quixstreams.md +++ b/docs/api-reference/quixstreams.md @@ -766,7 +766,7 @@ a {topic_name: composed} dict, where composed is a callable class StreamingDataFrame(BaseStreaming) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L69) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L68) `StreamingDataFrame` is the main object you will use for ETL work. @@ -829,7 +829,7 @@ def apply(func: Union[ metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L175) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L174) Apply a function to transform the value and return a new value. @@ -881,7 +881,7 @@ def update(func: Union[ metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L264) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L263) Apply a function to mutate value in-place or to perform a side effect @@ -941,7 +941,7 @@ def filter(func: Union[ metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L356) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L355) Filter value using provided function. @@ -987,7 +987,7 @@ def group_by(key: Union[str, Callable[[Any], Any]], key_serializer: Optional[SerializerType] = "json") -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L442) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L441) "Groups" messages by re-keying them via the provided group_by operation @@ -1043,7 +1043,7 @@ a clone with this operation added (assign to keep its effect). def contains(key: str) -> StreamingSeries ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L515) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L514) Check if the key is present in the Row value. @@ -1074,7 +1074,7 @@ or False otherwise. def to_topic(topic: Topic, key: Optional[Callable[[Any], Any]] = None) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L540) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L539) Produce current value to a topic. You can optionally specify a new key. @@ -1119,7 +1119,7 @@ the updated StreamingDataFrame instance (reassignment NOT required). def set_timestamp(func: Callable[[Any, Any, int, Any], int]) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L585) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L584) Set a new timestamp based on the current message value and its metadata. @@ -1157,14 +1157,13 @@ a new StreamingDataFrame instance ```python def set_headers( - func: Callable[ - [Any, Any, int, List[Tuple[str, HeaderValue]]], - Collection[Tuple[str, HeaderValue]], - ] -) -> Self + func: Callable[ + [Any, Any, int, HeadersTuples], + HeadersTuples, + ]) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L626) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L625) Set new message headers based on the current message value and metadata. @@ -1205,7 +1204,7 @@ a new StreamingDataFrame instance def print(pretty: bool = True, metadata: bool = False) -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L677) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L676) Print out the current message value (and optionally, the message metadata) to @@ -1253,7 +1252,7 @@ def compose( ) -> Dict[str, VoidExecutor] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L719) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L718) Compose all functions of this StreamingDataFrame into one big closure. @@ -1299,7 +1298,7 @@ def test(value: Any, topic: Optional[Topic] = None) -> List[Any] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L753) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L752) A shorthand to test `StreamingDataFrame` with provided value @@ -1330,7 +1329,7 @@ def tumbling_window(duration_ms: Union[int, timedelta], name: Optional[str] = None) -> TumblingWindowDefinition ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L790) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L789) Create a tumbling window transformation on this StreamingDataFrame. @@ -1407,7 +1406,7 @@ def hopping_window(duration_ms: Union[int, timedelta], name: Optional[str] = None) -> HoppingWindowDefinition ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L866) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L865) Create a hopping window transformation on this StreamingDataFrame. @@ -1492,7 +1491,7 @@ def sliding_window(duration_ms: Union[int, timedelta], name: Optional[str] = None) -> SlidingWindowDefinition ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L958) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L957) Create a sliding window transformation on this StreamingDataFrame. @@ -1572,7 +1571,7 @@ def drop(columns: Union[str, List[str]], errors: Literal["ignore", "raise"] = "raise") -> Self ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1039) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1038) Drop column(s) from the message value (value must support `del`, like a dict). @@ -1608,7 +1607,7 @@ a new StreamingDataFrame instance def sink(sink: BaseSink) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1083) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1082) Sink the processed data to the specified destination. @@ -4163,9 +4162,8 @@ Initialize the KinesisSink. #### KinesisSink.add ```python -def add(value: Any, key: Any, timestamp: int, - headers: list[tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) -> None +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L80) @@ -4445,9 +4443,8 @@ Default - `None`. #### PubSubSink.add ```python -def add(value: Any, key: Any, timestamp: int, - headers: list[tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) -> None +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/pubsub.py#L81) @@ -4511,9 +4508,8 @@ If flush() fails, the checkpoint will be aborted. ```python @abc.abstractmethod -def add(value: Any, key: Any, timestamp: int, - headers: List[Tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/base/sink.py#L33) @@ -4583,9 +4579,8 @@ corresponding topic partition. #### BatchingSink.add ```python -def add(value: Any, key: Any, timestamp: int, - headers: List[Tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/base/sink.py#L93) @@ -4924,7 +4919,7 @@ it should not be mutated during message processing. class ConfluentKafkaMessageProto(Protocol) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/types.py#L13) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/types.py#L16) An interface of `confluent_kafka.Message`. @@ -5677,7 +5672,7 @@ A base class for all Serializers ```python @property -def extra_headers() -> MessageHeadersMapping +def extra_headers() -> HeadersMapping ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/serializers/base.py#L68) @@ -5843,11 +5838,11 @@ fail (it ignores issues for a topic already existing). #### merge\_headers ```python -def merge_headers(original: Optional[MessageHeadersTuples], - other: MessageHeadersMapping) -> MessageHeadersTuples +def merge_headers(original: KafkaHeaders, + other: HeadersMapping) -> HeadersTuples ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/utils.py#L8) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/models/topics/utils.py#L10) Merge two sets of Kafka message headers, overwriting headers in "origin" @@ -6364,7 +6359,7 @@ def expire_windows(max_start_time: int, delete: bool = True) -> list[tuple[tuple[int, int], Any]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/windowed/transaction.py#L120) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/windowed/transaction.py#L128) Get all expired windows from RocksDB up to the specified `max_start_time` timestamp. @@ -6398,7 +6393,7 @@ A sorted list of tuples in the format `((start, end), value)`. def delete_windows(max_start_time: int, prefix: bytes) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/windowed/transaction.py#L177) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/windowed/transaction.py#L185) Delete windows from RocksDB up to the specified `max_start_time` timestamp. @@ -6430,7 +6425,7 @@ def get_windows(start_from_ms: int, backwards: bool = False) -> list[tuple[tuple[int, int], Any]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/windowed/transaction.py#L224) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/windowed/transaction.py#L232) Get all windows that start between "start_from_ms" and "start_to_ms" @@ -6619,7 +6614,7 @@ using the provided `timestamp`. #### WindowedTransactionState.get\_latest\_timestamp ```python -def get_latest_timestamp() -> int +def get_latest_timestamp() -> Optional[int] ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/windowed/state.py#L69) @@ -6770,13 +6765,13 @@ partitions' transactions. ```python def __init__( name: str, - topic: str, + topic: Optional[str], base_dir: str, changelog_producer_factory: Optional[ChangelogProducerFactory] = None, - options: Optional[options_type] = None) + options: Optional[RocksDBOptionsType] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/store.py#L28) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/store.py#L26) **Arguments**: @@ -6829,7 +6824,7 @@ def write(cache: PartitionTransactionCache, batch: Optional[WriteBatch] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L112) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L115) Write data to RocksDB @@ -6846,11 +6841,10 @@ Write data to RocksDB ```python def get(key: bytes, - default: Any = None, - cf_name: str = "default") -> Union[None, bytes, Any] + cf_name: str = "default") -> Union[bytes, Literal[Marker.UNDEFINED]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L177) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L180) Get a key from RocksDB. @@ -6872,7 +6866,7 @@ a value if the key is present in the DB. Otherwise, `default` def exists(key: bytes, cf_name: str = "default") -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L191) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L196) Check if a key is present in the DB. @@ -6893,7 +6887,7 @@ Check if a key is present in the DB. def get_processed_offset() -> Optional[int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L202) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L207) Get last processed offset for the given partition @@ -6909,7 +6903,7 @@ offset or `None` if there's no processed offset yet def get_changelog_offset() -> Optional[int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L214) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L222) Get offset that the changelog is up-to-date with. @@ -6925,7 +6919,7 @@ offset or `None` if there's no processed offset yet def close() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L224) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L234) Close the underlying RocksDB @@ -6938,7 +6932,7 @@ Close the underlying RocksDB def path() -> str ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L237) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L247) Absolute path to RocksDB database folder @@ -6955,7 +6949,7 @@ file path def destroy(cls, path: str) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L245) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L255) Delete underlying RocksDB database @@ -6973,7 +6967,7 @@ The database must be closed first. def get_column_family_handle(cf_name: str) -> ColumnFamily ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L255) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L265) Get a column family handle to pass to it WriteBatch. @@ -6996,7 +6990,7 @@ instance of `rocksdict.ColumnFamily` def get_column_family(cf_name: str) -> Rdict ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L276) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/rocksdb/partition.py#L286) Get a column family instance. @@ -7058,7 +7052,7 @@ Requires a full state recovery for each partition on assignment. ```python def __init__( name: str, - topic: str, + topic: Optional[str], changelog_producer_factory: Optional[ChangelogProducerFactory] = None ) -> None ``` @@ -7121,7 +7115,7 @@ Write data to the state def get_processed_offset() -> Optional[int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L122) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L125) Get last processed offset for the given partition @@ -7137,7 +7131,7 @@ offset or `None` if there's no processed offset yet def get_changelog_offset() -> Optional[int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L129) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L132) Get offset that the changelog is up-to-date with. @@ -7152,11 +7146,10 @@ offset or `None` if there's no processed offset yet ```python @_validate_partition_state() def get(key: bytes, - default: Any = None, - cf_name: str = "default") -> Union[None, bytes, Any] + cf_name: str = "default") -> Union[bytes, Literal[Marker.UNDEFINED]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L137) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L140) Get a key from the store @@ -7179,7 +7172,7 @@ a value if the key is present in the store. Otherwise, `default` def exists(key: bytes, cf_name: str = "default") -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L151) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/memory/partition.py#L154) Check if a key is present in the store. @@ -7224,7 +7217,7 @@ Since `StorePartition`s do recovery directly, it also handles recovery transacti def offset() -> int ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L62) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L64) Get the changelog offset from the underlying `StorePartition`. @@ -7241,7 +7234,7 @@ changelog offset (int) def needs_recovery_check() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L78) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L80) Determine whether to attempt recovery for underlying `StorePartition`. @@ -7256,7 +7249,7 @@ This does NOT mean that anything actually requires recovering. def has_invalid_offset() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L89) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L91) Determine if the current changelog offset stored in state is invalid. @@ -7269,7 +7262,7 @@ def recover_from_changelog_message( changelog_message: ConfluentKafkaMessageProto) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L103) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L108) Recover the StorePartition using a message read from its respective changelog. @@ -7277,23 +7270,6 @@ Recover the StorePartition using a message read from its respective changelog. - `changelog_message`: A confluent kafka message (everything as bytes) - - -#### RecoveryPartition.set\_watermarks - -```python -def set_watermarks(lowwater: int, highwater: int) -``` - -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L115) - -Set the changelog watermarks as gathered from Consumer.get_watermark_offsets() - -**Arguments**: - -- `lowwater`: topic partition lowwater -- `highwater`: topic partition highwater - #### RecoveryPartition.set\_recovery\_consume\_position @@ -7302,7 +7278,7 @@ Set the changelog watermarks as gathered from Consumer.get_watermark_offsets() def set_recovery_consume_position(offset: int) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L125) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L120) Update the recovery partition with the consumer's position (whenever @@ -7322,7 +7298,7 @@ It is possible that it may be set more than once. class ChangelogProducerFactory() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L137) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L132) Generates ChangelogProducers, which produce changelog messages to a StorePartition. @@ -7334,7 +7310,7 @@ Generates ChangelogProducers, which produce changelog messages to a StorePartiti def __init__(changelog_name: str, producer: RowProducer) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L142) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L137) **Arguments**: @@ -7353,7 +7329,7 @@ a ChangelogWriter instance def get_partition_producer(partition_num) -> "ChangelogProducer" ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L152) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L147) Generate a ChangelogProducer for producing to a specific partition number @@ -7371,7 +7347,7 @@ Generate a ChangelogProducer for producing to a specific partition number class ChangelogProducer() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L166) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L161) Generated for a `StorePartition` to produce state changes to its respective kafka changelog partition. @@ -7384,7 +7360,7 @@ kafka changelog partition. def __init__(changelog_name: str, partition: int, producer: RowProducer) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L172) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L167) **Arguments**: @@ -7399,10 +7375,10 @@ def __init__(changelog_name: str, partition: int, producer: RowProducer) ```python def produce(key: bytes, value: Optional[bytes] = None, - headers: Optional[MessageHeadersMapping] = None) + headers: Optional[Headers] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L195) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L190) Produce a message to a changelog topic partition. @@ -7420,7 +7396,7 @@ Produce a message to a changelog topic partition. class RecoveryManager() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L220) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L215) Manages all consumer-related aspects of recovery, including: - assigning/revoking, pausing/resuming topic partitions (especially changelogs) @@ -7440,7 +7416,7 @@ Recovery is attempted from the `Application` after any new partition assignment. def partitions() -> Dict[int, Dict[str, RecoveryPartition]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L239) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L234) Returns a mapping of assigned RecoveryPartitions in the following format: {: {: }} @@ -7454,7 +7430,7 @@ Returns a mapping of assigned RecoveryPartitions in the following format: def has_assignments() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L247) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L242) Whether the Application has assigned RecoveryPartitions @@ -7471,7 +7447,7 @@ has assignments, as bool def recovering() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L256) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L251) Whether the Application is currently recovering @@ -7489,7 +7465,7 @@ def register_changelog(topic_name: Optional[str], topic_config: Optional[TopicConfig] = None) -> Topic ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L264) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L259) Register a changelog Topic with the TopicManager. @@ -7506,7 +7482,7 @@ Register a changelog Topic with the TopicManager. def do_recovery() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L282) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L277) If there are any active RecoveryPartitions, do a recovery procedure. @@ -7522,7 +7498,7 @@ def assign_partition(topic: Optional[str], partition: int, store_partitions: Dict[str, StorePartition]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L335) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L332) Assigns `StorePartition`s (as `RecoveryPartition`s) ONLY IF recovery required. @@ -7536,7 +7512,7 @@ Pauses active consumer partitions as needed. def revoke_partition(partition_num: int) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L412) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/recovery.py#L409) revoke ALL StorePartitions (across all Stores) for a given partition number @@ -7622,7 +7598,7 @@ using the provided `timestamp`. #### WindowedState.get\_latest\_timestamp ```python -def get_latest_timestamp() -> int +def get_latest_timestamp() -> Optional[int] ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/types.py#L48) @@ -8042,7 +8018,7 @@ StateStoreManager is responsible for: def stores() -> Dict[Optional[str], Dict[str, Store]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L73) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L79) Map of registered state stores @@ -8059,7 +8035,7 @@ dict in format {topic: {store_name: store}} def recovery_required() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L81) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L87) Whether recovery needs to be done. @@ -8072,7 +8048,7 @@ Whether recovery needs to be done. def using_changelogs() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L90) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L96) Whether the StateStoreManager is using changelog topics @@ -8085,10 +8061,10 @@ using changelogs, as bool #### StateStoreManager.do\_recovery ```python -def do_recovery() +def do_recovery() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L102) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L108) Perform a state recovery, if necessary. @@ -8097,10 +8073,10 @@ Perform a state recovery, if necessary. #### StateStoreManager.stop\_recovery ```python -def stop_recovery() +def stop_recovery() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L108) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L117) Stop recovery (called during app shutdown). @@ -8112,7 +8088,7 @@ Stop recovery (called during app shutdown). def get_store(topic: str, store_name: str = DEFAULT_STATE_STORE_NAME) -> Store ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L114) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L126) Get a store for given name and topic @@ -8133,10 +8109,10 @@ instance of `Store` def register_store(topic_name: Optional[str], store_name: str = DEFAULT_STATE_STORE_NAME, store_type: Optional[StoreTypes] = None, - topic_config: Optional[TopicConfig] = None) + topic_config: Optional[TopicConfig] = None) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L149) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L163) Register a state store to be managed by StateStoreManager. @@ -8157,10 +8133,10 @@ Default to StateStoreManager `default_store_type` #### StateStoreManager.register\_windowed\_store ```python -def register_windowed_store(topic_name: str, store_name: str) +def register_windowed_store(topic_name: str, store_name: str) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L194) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L208) Register a windowed state store to be managed by StateStoreManager. @@ -8179,10 +8155,10 @@ Each window store can be registered only once for each topic. #### StateStoreManager.clear\_stores ```python -def clear_stores() +def clear_stores() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L220) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L234) Delete all state stores managed by StateStoreManager. @@ -8195,7 +8171,7 @@ def on_partition_assign(topic: Optional[str], partition: int, committed_offset: int) -> Dict[str, StorePartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L235) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L250) Assign store partitions for each registered store for the given `TopicPartition` @@ -8216,10 +8192,10 @@ list of assigned `StorePartition` #### StateStoreManager.on\_partition\_revoke ```python -def on_partition_revoke(topic: str, partition: int) +def on_partition_revoke(topic: str, partition: int) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L261) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L276) Revoke store partitions for each registered store for the given `TopicPartition` @@ -8233,10 +8209,10 @@ Revoke store partitions for each registered store for the given `TopicPartition` #### StateStoreManager.init ```python -def init() +def init() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L274) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L289) Initialize `StateStoreManager` and create a store directory @@ -8246,10 +8222,10 @@ Initialize `StateStoreManager` and create a store directory #### StateStoreManager.close ```python -def close() +def close() -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L281) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/manager.py#L296) Close all registered stores @@ -8282,10 +8258,10 @@ partitions' transactions. ```python @property -def topic() -> str +def topic() -> Optional[str] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L37) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L35) Topic name @@ -8298,7 +8274,7 @@ Topic name def name() -> str ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L44) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L42) Store name @@ -8311,7 +8287,7 @@ Store name def partitions() -> Dict[int, StorePartition] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L51) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L49) Mapping of assigned store partitions @@ -8327,7 +8303,7 @@ dict of "{partition: }" def assign_partition(partition: int) -> StorePartition ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L58) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L56) Assign new store partition @@ -8347,7 +8323,7 @@ instance of `StorePartition` def revoke_partition(partition: int) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L85) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L83) Revoke assigned store partition @@ -8363,7 +8339,7 @@ Revoke assigned store partition def start_partition_transaction(partition: int) -> PartitionTransaction ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L103) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L101) Start a new partition transaction. @@ -8385,7 +8361,7 @@ instance of `PartitionTransaction` def close() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L121) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/store.py#L119) Close store and revoke all store partitions @@ -8401,7 +8377,7 @@ Close store and revoke all store partitions class StorePartition(ABC) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L25) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L26) A base class to access state in the underlying storage. It represents a single instance of some storage (e.g. a single database for @@ -8416,7 +8392,7 @@ the persistent storage). def get_processed_offset() -> Optional[int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L56) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L57) Get last processed offset for the given partition @@ -8433,7 +8409,7 @@ offset or `None` if there's no processed offset yet def get_changelog_offset() -> Optional[int] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L64) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L65) Get offset that the changelog is up-to-date with. @@ -8451,7 +8427,7 @@ def write(cache: PartitionTransactionCache, processed_offset: Optional[int], changelog_offset: Optional[int]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L72) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L73) Update the state with data from the update cache @@ -8468,11 +8444,10 @@ Update the state with data from the update cache ```python @abstractmethod def get(key: bytes, - default: Any = None, - cf_name: str = "default") -> Union[None, bytes, Any] + cf_name: str = "default") -> Union[bytes, Literal[Marker.UNDEFINED]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L87) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L88) Get a key from the store @@ -8495,7 +8470,7 @@ a value if the key is present in the store. Otherwise, `default` def exists(key: bytes, cf_name: str = "default") -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L100) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L101) Check if a key is present in the store. @@ -8516,7 +8491,7 @@ Check if a key is present in the store. def begin() -> PartitionTransaction ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L109) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L110) Start a new `PartitionTransaction` @@ -8528,10 +8503,11 @@ Using `PartitionTransaction` is a recommended way for accessing the data. ```python def recover_from_changelog_message( - changelog_message: ConfluentKafkaMessageProto, committed_offset: int) + changelog_message: ConfluentKafkaMessageProto, + committed_offset: int) -> None ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L122) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/partition.py#L123) Updates state from a given changelog message. @@ -8552,7 +8528,7 @@ Updates state from a given changelog message. class PartitionTransactionCache() ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L37) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L36) A cache with the data updated in the current PartitionTransaction. It is used to read-your-own-writes before the transaction is committed to the Store. @@ -8567,10 +8543,10 @@ to simplify the querying over them. ```python def get(key: bytes, prefix: bytes, - cf_name: str = "default") -> Union[bytes, Undefined] + cf_name: str = "default") -> Union[bytes, Marker] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L60) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L59) Get a value for the key. @@ -8594,7 +8570,7 @@ If the key is not present in the cache, returns "UNDEFINED sentinel def set(key: bytes, value: bytes, prefix: bytes, cf_name: str = "default") ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L90) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L89) Set a value for the key. @@ -8612,7 +8588,7 @@ Set a value for the key. def delete(key: Any, prefix: bytes, cf_name: str = "default") ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L103) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L102) Delete a key. @@ -8630,7 +8606,7 @@ Delete a key. def is_empty() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L116) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L115) Return True if any changes have been made (updates or deletes), otherwise return False. @@ -8643,7 +8619,7 @@ return False. def get_column_families() -> Set[str] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L123) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L122) Get all update column families. @@ -8655,7 +8631,7 @@ Get all update column families. def get_updates(cf_name: str = "default") -> Dict[bytes, Dict[bytes, bytes]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L129) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L128) Get all updated keys (excluding deleted) @@ -8672,7 +8648,7 @@ in the format "{: {: }}". def get_deletes(cf_name: str = "default") -> Set[bytes] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L138) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L137) Get all deleted keys (excluding updated) as a set. @@ -8684,7 +8660,7 @@ Get all deleted keys (excluding updated) as a set. class PartitionTransactionStatus(enum.Enum) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L145) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L144) @@ -8718,7 +8694,7 @@ Transaction is failed, it cannot be used anymore def validate_transaction_status(*allowed: PartitionTransactionStatus) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L156) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L155) Check that the status of `RocksDBTransaction` is valid before calling a method @@ -8730,7 +8706,7 @@ Check that the status of `RocksDBTransaction` is valid before calling a method class PartitionTransaction(ABC) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L176) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L175) A transaction class to perform simple key-value operations like "get", "set", "delete" and "exists" on a single storage partition. @@ -8744,7 +8720,7 @@ A transaction class to perform simple key-value operations like def failed() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L208) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L207) Return `True` if transaction failed to update data at some point. @@ -8763,7 +8739,7 @@ bool def completed() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L218) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L217) Return `True` if transaction is successfully completed. @@ -8782,7 +8758,7 @@ bool def prepared() -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L228) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L227) Return `True` if transaction is prepared completed. @@ -8801,7 +8777,7 @@ bool def changelog_topic_partition() -> Optional[Tuple[str, int]] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L238) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L237) Return the changelog topic-partition for the StorePartition of this transaction. @@ -8819,7 +8795,7 @@ Returns `None` if changelog_producer is not provided. def as_state(prefix: Any = DEFAULT_PREFIX) -> State ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L263) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L264) Create an instance implementing the `State` protocol to be provided @@ -8843,7 +8819,7 @@ def get(key: Any, cf_name: str = "default") -> Optional[Any] ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L282) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L283) Get a key from the store. @@ -8869,7 +8845,7 @@ value or None if the key is not found and `default` is not provided def set(key: Any, value: Any, prefix: bytes, cf_name: str = "default") ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L318) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L319) Set value for the key. @@ -8889,7 +8865,7 @@ Set value for the key. def delete(key: Any, prefix: bytes, cf_name: str = "default") ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L341) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L342) Delete value for the key. @@ -8910,7 +8886,7 @@ This function always returns `None`, even if value is not found. def exists(key: Any, prefix: bytes, cf_name: str = "default") -> bool ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L360) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L361) Check if the key exists in state. @@ -8933,7 +8909,7 @@ True if key exists, False otherwise def prepare(processed_offset: Optional[int]) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L380) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L381) Produce changelog messages to the changelog topic for all changes accumulated @@ -8961,7 +8937,7 @@ def flush(processed_offset: Optional[int] = None, changelog_offset: Optional[int] = None) ``` -[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L441) +[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/state/base/transaction.py#L442) Flush the recent updates to the database. diff --git a/docs/api-reference/sinks.md b/docs/api-reference/sinks.md index af8618b86..dfffbeaaa 100644 --- a/docs/api-reference/sinks.md +++ b/docs/api-reference/sinks.md @@ -47,9 +47,8 @@ If flush() fails, the checkpoint will be aborted. ```python @abc.abstractmethod -def add(value: Any, key: Any, timestamp: int, - headers: List[Tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/base/sink.py#L33) @@ -125,9 +124,8 @@ corresponding topic partition. #### BatchingSink.add ```python -def add(value: Any, key: Any, timestamp: int, - headers: List[Tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/base/sink.py#L93) @@ -1320,9 +1318,8 @@ Default - `None`. #### PubSubSink.add ```python -def add(value: Any, key: Any, timestamp: int, - headers: list[tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) -> None +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/pubsub.py#L81) @@ -1461,9 +1458,8 @@ Initialize the KinesisSink. #### KinesisSink.add ```python -def add(value: Any, key: Any, timestamp: int, - headers: list[tuple[str, HeaderValue]], topic: str, partition: int, - offset: int) -> None +def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples, + topic: str, partition: int, offset: int) -> None ``` [[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/sinks/community/kinesis.py#L80)