Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mypy: make quixstreams.state.* pass type checks #657

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ module = [
"quixstreams.dataframe.*",
"quixstreams.models.*",
"quixstreams.platforms.*",
"quixstreams.state.*",
"quixstreams.rowproducer.*"
]
ignore_errors = true

13 changes: 6 additions & 7 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import (
Any,
Callable,
Collection,
Dict,
List,
Literal,
Expand Down Expand Up @@ -36,7 +35,7 @@
VoidExecutor,
)
from quixstreams.models import (
HeaderValue,
HeadersTuples,
MessageContext,
Row,
Topic,
Expand Down Expand Up @@ -626,8 +625,8 @@ def _set_timestamp_callback(
def set_headers(
self,
func: Callable[
[Any, Any, int, List[Tuple[str, HeaderValue]]],
Collection[Tuple[str, HeaderValue]],
[Any, Any, int, HeadersTuples],
HeadersTuples,
],
) -> Self:
"""
Expand Down Expand Up @@ -663,8 +662,8 @@ def _set_headers_callback(
value: Any,
key: Any,
timestamp: int,
headers: Collection[Tuple[str, HeaderValue]],
) -> Tuple[Any, Any, int, Collection[Tuple[str, HeaderValue]]]:
headers: HeadersTuples,
) -> Tuple[Any, Any, int, HeadersTuples]:
# Create a shallow copy of original headers to prevent potential mutations
# of the same collection
headers = list(headers) if headers else []
Expand Down Expand Up @@ -1102,7 +1101,7 @@ def sink(self, sink: BaseSink):
self._processing_context.sink_manager.register(sink)

def _sink_callback(
value: Any, key: Any, timestamp: int, headers: List[Tuple[str, HeaderValue]]
value: Any, key: Any, timestamp: int, headers: HeadersTuples
):
ctx = message_context()
sink.add(
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/models/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class KafkaMessage:

def __init__(
self,
key: Optional[MessageKey],
key: MessageKey,
value: Optional[MessageValue],
headers: Optional[Headers],
timestamp: Optional[int] = None,
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/models/rows.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Optional

from .messagecontext import MessageContext
from .types import MessageHeadersTuples
from .types import KafkaHeaders


class Row:
Expand All @@ -19,7 +19,7 @@ def __init__(
key: Optional[Any],
timestamp: int,
context: MessageContext,
headers: Optional[MessageHeadersTuples] = None,
headers: KafkaHeaders = None,
gwaramadze marked this conversation as resolved.
Show resolved Hide resolved
):
self.value = value
self.key = key
Expand Down
8 changes: 4 additions & 4 deletions quixstreams/models/serializers/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Any, Optional, Union
from typing import Any, Union

from confluent_kafka.serialization import (
MessageField,
Expand All @@ -9,7 +9,7 @@
)
from typing_extensions import Literal, TypeAlias

from ..types import MessageHeadersMapping, MessageHeadersTuples
from ..types import HeadersMapping, KafkaHeaders

__all__ = (
"SerializationContext",
Expand All @@ -34,7 +34,7 @@ def __init__(
self,
topic: str,
field: MessageField,
headers: Optional[MessageHeadersTuples] = None,
headers: KafkaHeaders = None,
) -> None:
self.topic = topic
self.field = field
Expand Down Expand Up @@ -65,7 +65,7 @@ class Serializer(abc.ABC):
"""

@property
def extra_headers(self) -> MessageHeadersMapping:
def extra_headers(self) -> HeadersMapping:
"""
Informs producer to set additional headers
for the message it will be serializing
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/models/serializers/quix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import gzip
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union

from quixstreams.models.types import MessageHeadersMapping
from quixstreams.models.types import HeadersMapping
from quixstreams.utils.json import (
dumps as default_dumps,
)
Expand Down Expand Up @@ -290,7 +290,7 @@ def __init__(
self._as_legacy = as_legacy

@property
def extra_headers(self) -> MessageHeadersMapping:
def extra_headers(self) -> HeadersMapping:
# Legacy-formatted messages should not contain any headers
return self._extra_headers if not self._as_legacy else {}

Expand Down
4 changes: 2 additions & 2 deletions quixstreams/models/topics/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
from quixstreams.models.types import (
ConfluentKafkaMessageProto,
Headers,
MessageHeadersTuples,
KafkaHeaders,
)

__all__ = ("Topic", "TopicConfig", "TimestampExtractor")

logger = logging.getLogger(__name__)

TimestampExtractor = Callable[
[Any, Optional[MessageHeadersTuples], int, TimestampType],
[Any, KafkaHeaders, int, TimestampType],
int,
]

Expand Down
12 changes: 6 additions & 6 deletions quixstreams/models/topics/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from typing import Optional

from quixstreams.models.types import MessageHeadersMapping, MessageHeadersTuples
from quixstreams.models.types import (
HeadersMapping,
HeadersTuples,
KafkaHeaders,
)

__all__ = ("merge_headers",)


def merge_headers(
original: Optional[MessageHeadersTuples], other: MessageHeadersMapping
) -> MessageHeadersTuples:
def merge_headers(original: KafkaHeaders, other: HeadersMapping) -> HeadersTuples:
"""
Merge two sets of Kafka message headers, overwriting headers in "origin"
by the values from "other".
Expand Down
21 changes: 12 additions & 9 deletions quixstreams/models/types.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from typing import Dict, List, Optional, Tuple, Union
from typing import List, Mapping, Optional, Sequence, Tuple, Union

from typing_extensions import Protocol

MessageKey = Union[str, bytes]
MessageKey = Optional[Union[str, bytes]]
gwaramadze marked this conversation as resolved.
Show resolved Hide resolved
MessageValue = Union[str, bytes]
HeaderValue = Optional[Union[str, bytes]]
MessageHeadersTuples = List[Tuple[str, HeaderValue]]
MessageHeadersMapping = Dict[str, HeaderValue]
Headers = Union[MessageHeadersTuples, MessageHeadersMapping]

HeadersValue = Union[str, bytes]
HeadersMapping = Mapping[str, HeadersValue]
HeadersTuples = Sequence[Tuple[str, HeadersValue]]
Headers = Union[HeadersTuples, HeadersMapping]

KafkaHeaders = Optional[List[Tuple[str, bytes]]]


class ConfluentKafkaMessageProto(Protocol):
Expand All @@ -21,13 +24,13 @@ class ConfluentKafkaMessageProto(Protocol):

"""

def headers(self, *args, **kwargs) -> Optional[MessageHeadersTuples]: ...
def headers(self, *args, **kwargs) -> KafkaHeaders: ...

def key(self, *args, **kwargs) -> Optional[MessageKey]: ...
def key(self, *args, **kwargs) -> MessageKey: ...

def offset(self, *args, **kwargs) -> int: ...

def partition(self, *args, **kwargs) -> int: ...
def partition(self, *args, **kwargs) -> Optional[int]: ...

def timestamp(self, *args, **kwargs) -> Tuple[int, int]: ...

Expand Down
6 changes: 3 additions & 3 deletions quixstreams/sinks/base/batch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from collections import deque
from itertools import islice
from typing import Any, Deque, Iterable, Iterator, List, Tuple
from typing import Any, Deque, Iterable, Iterator

from quixstreams.models import HeaderValue
from quixstreams.models import HeadersTuples

from .item import SinkItem

Expand Down Expand Up @@ -47,7 +47,7 @@ def append(
value: Any,
key: Any,
timestamp: int,
headers: List[Tuple[str, HeaderValue]],
headers: HeadersTuples,
offset: int,
):
self._buffer.append(
Expand Down
6 changes: 3 additions & 3 deletions quixstreams/sinks/base/item.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, List, Tuple
from typing import Any

from quixstreams.models import HeaderValue
from quixstreams.models import HeadersTuples

__all__ = ("SinkItem",)

Expand All @@ -19,7 +19,7 @@ def __init__(
value: Any,
key: Any,
timestamp: int,
headers: List[Tuple[str, HeaderValue]],
headers: HeadersTuples,
offset: int,
):
self.key = key
Expand Down
8 changes: 4 additions & 4 deletions quixstreams/sinks/base/sink.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import abc
import logging
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, Tuple

from quixstreams.models import HeaderValue
from quixstreams.models import HeadersTuples
from quixstreams.sinks.base.batch import SinkBatch

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -35,7 +35,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: List[Tuple[str, HeaderValue]],
headers: HeadersTuples,
topic: str,
partition: int,
offset: int,
Expand Down Expand Up @@ -95,7 +95,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: List[Tuple[str, HeaderValue]],
headers: HeadersTuples,
topic: str,
partition: int,
offset: int,
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
) from exc

from quixstreams.exceptions import QuixException
from quixstreams.models import HeaderValue
from quixstreams.models import HeadersTuples
from quixstreams.sinks import BatchingSink, SinkBatch

__all__ = ("BigQuerySink", "BigQuerySinkException")
Expand Down Expand Up @@ -168,7 +168,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: list[tuple[str, HeaderValue]],
headers: HeadersTuples,
topic: str,
partition: int,
offset: int,
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
'run "pip install quixstreams[kinesis]" to use KinesisSink'
) from exc

from quixstreams.models.types import HeaderValue
from quixstreams.models.types import HeadersTuples
from quixstreams.sinks.base import BaseSink
from quixstreams.sinks.base.exceptions import SinkBackpressureError

Expand Down Expand Up @@ -82,7 +82,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: list[tuple[str, HeaderValue]],
headers: HeadersTuples,
topic: str,
partition: int,
offset: int,
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
) from exc

from quixstreams.exceptions import QuixException
from quixstreams.models import HeaderValue
from quixstreams.models import HeadersTuples
from quixstreams.sinks import BatchingSink, SinkBatch

__all__ = ("PostgreSQLSink", "PostgreSQLSinkException")
Expand Down Expand Up @@ -115,7 +115,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: list[tuple[str, HeaderValue]],
headers: HeadersTuples,
topic: str,
partition: int,
offset: int,
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/sinks/community/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
'run "pip install quixstreams[pubsub]" to use PubSubSink'
) from exc

from quixstreams.models.types import HeaderValue
from quixstreams.models.types import HeadersTuples
from quixstreams.sinks.base import BaseSink, SinkBackpressureError

__all__ = ("PubSubSink", "PubSubTopicNotFoundError")
Expand Down Expand Up @@ -83,7 +83,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: list[tuple[str, HeaderValue]],
headers: HeadersTuples,
topic: str,
partition: int,
offset: int,
Expand Down
6 changes: 3 additions & 3 deletions quixstreams/sinks/core/influxdb3.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
import sys
import time
from typing import Any, Iterable, List, Mapping, Optional, Tuple
from typing import Any, Iterable, Mapping, Optional

from quixstreams.models import HeaderValue
from quixstreams.models import HeadersTuples

try:
import influxdb_client_3
Expand Down Expand Up @@ -127,7 +127,7 @@ def add(
value: Any,
key: Any,
timestamp: int,
headers: List[Tuple[str, HeaderValue]],
headers: HeadersTuples,
topic: str,
partition: int,
offset: int,
Expand Down
Loading