Skip to content

Commit

Permalink
mypy: make quixstreams.platforms.* pass type checks
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Dec 9, 2024
1 parent 211f450 commit 650a205
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 44 deletions.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ ignore_errors = true
module = [
"quixstreams.core.*",
"quixstreams.dataframe.*",
"quixstreams.platforms.*",
"quixstreams.rowproducer.*"
]
ignore_errors = true
7 changes: 5 additions & 2 deletions quixstreams/models/topics/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class TopicManager:
"""

# Default topic params
default_num_partitions = 1
default_replication_factor = 1
default_num_partitions: Optional[int] = 1
default_replication_factor: Optional[int] = 1
default_extra_config: dict[str, str] = {}

# Max topic name length for the new topics
Expand Down Expand Up @@ -217,6 +217,9 @@ def _get_source_topic_config(
if topic_config is None:
raise RuntimeError(f"No configuration can be found for topic {topic_name}")

if topic_config is None:
raise RuntimeError(f"No configuration can be found for topic {topic_name}")

# Copy only certain configuration values from original topic
if extras_imports:
topic_config.extra_config = {
Expand Down
4 changes: 2 additions & 2 deletions quixstreams/models/topics/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class TopicConfig:
Generally used by Topic and any topic creation procedures.
"""

num_partitions: int
replication_factor: int
num_partitions: Optional[int]
replication_factor: Optional[int]
extra_config: dict[str, str] = dataclasses.field(default_factory=dict)

def as_dict(self):
Expand Down
9 changes: 5 additions & 4 deletions quixstreams/platforms/quix/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,21 @@ class QuixPortalApiService:

def __init__(
self,
auth_token: Optional[str] = None,
auth_token: str,
portal_api: Optional[str] = None,
api_version: Optional[str] = None,
default_workspace_id: Optional[str] = None,
):
self._portal_api = (
portal_api or QUIX_ENVIRONMENT.portal_api or DEFAULT_PORTAL_API_URL
)
self._auth_token = auth_token or QUIX_ENVIRONMENT.sdk_token
if not self._auth_token:
if not auth_token:
raise MissingConnectionRequirements(
f"A Quix Cloud auth token (SDK or PAT) is required; "
f"set with environment variable {QUIX_ENVIRONMENT.SDK_TOKEN}"
)
self._auth_token = auth_token

self._default_workspace_id = (
default_workspace_id or QUIX_ENVIRONMENT.workspace_id
)
Expand Down Expand Up @@ -133,7 +134,7 @@ def get_workspace_certificate(
f"/workspaces/{workspace_id}/certificates", timeout=timeout
).content
if not content:
return
return None

with ZipFile(BytesIO(content)) as z:
with z.open("ca.cert") as f:
Expand Down
57 changes: 36 additions & 21 deletions quixstreams/platforms/quix/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import time
from copy import deepcopy
from typing import List, Optional
from typing import Any, List, Optional

from requests import HTTPError

Expand Down Expand Up @@ -122,17 +122,17 @@ def __init__(
try:
self._workspace_id = workspace_id or self.api.default_workspace_id
except UndefinedQuixWorkspaceId:
self._workspace_id = None
self._workspace_id = ""
logger.warning(
"'workspace_id' argument was not provided nor set with "
"'Quix__Workspace__Id' environment; if only one Workspace ID for the "
"provided auth token exists (often true with SDK tokens), "
"then that ID will be used. Otherwise, provide a known topic name to "
"method 'get_workspace_info(topic)' to obtain desired Workspace ID."
)
self._librdkafka_connect_config = None
self._quix_broker_settings = None
self._workspace_meta = None
self._librdkafka_connect_config: Optional[ConnectionConfig] = None
self._quix_broker_settings: dict[str, Any] = {}
self._workspace_meta: dict[str, Any] = {}
self._timeout = timeout
self._topic_create_timeout = topic_create_timeout

Expand Down Expand Up @@ -175,18 +175,22 @@ def librdkafka_extra_config(self) -> dict:
}

@classmethod
def convert_topic_response(cls, api_response: dict) -> Topic:
def convert_topic_response(
cls, api_response: dict, extra_config: Optional[dict] = None
) -> Topic:
"""
Converts a GET or POST ("create") topic API response to a Topic object
:param api_response: the dict response from a get or create topic call
:return: a corresponding Topic object
"""
if extra_config is None:
extra_config = {}

topic_config = api_response["configuration"]
extra_config = {
"retention.ms": topic_config["retentionInMinutes"] * 60 * 1000,
"retention.bytes": topic_config["retentionInBytes"],
}
extra_config["retention.ms"] = topic_config["retentionInMinutes"] * 60 * 1000
extra_config["retention.bytes"] = topic_config["retentionInBytes"]

# Map value returned by Quix API to Kafka Admin API format
if topic_config.get("cleanupPolicy"):
cleanup_policy = _quix_cleanup_policy_to_kafka(
Expand Down Expand Up @@ -254,6 +258,7 @@ def search_for_workspace(
for ws in ws_list:
if ws["name"] == workspace_name_or_id:
return ws
raise

def _set_workspace_info(self, workspace_data: dict):
ws_data = deepcopy(workspace_data)
Expand All @@ -280,14 +285,16 @@ def get_workspace_info(
:param timeout: response timeout (seconds); Default 30
"""
# TODO: more error handling with the wrong combo of ws_id and topic
ws_data: Optional[dict] = None
if self._workspace_id:
ws_data = self.search_for_workspace(
workspace_name_or_id=self._workspace_id, timeout=timeout
)
else:
elif known_workspace_topic:
ws_data = self.search_for_topic_workspace(
known_workspace_topic, timeout=timeout
)

if not ws_data:
raise NoWorkspaceFound(
"No workspace was found for the given workspace/auth-token/topic combo"
Expand Down Expand Up @@ -316,6 +323,8 @@ def search_workspace_for_topic(
if t["name"] == topic or t["id"] == topic:
return workspace_id

return None

def search_for_topic_workspace(
self, topic: str, timeout: Optional[float] = None
) -> Optional[dict]:
Expand Down Expand Up @@ -345,6 +354,8 @@ def search_for_topic_workspace(
):
return ws

return None

def create_topic(self, topic: Topic, timeout: Optional[float] = None):
"""
The actual API call to create the topic.
Expand All @@ -353,6 +364,8 @@ def create_topic(self, topic: Topic, timeout: Optional[float] = None):
:param timeout: response timeout (seconds); Default 30
"""
cfg = topic.config
if cfg is None:
raise RuntimeError("Topic config not set")

# settings that must be ints or Nones
ret_ms = cfg.extra_config.get("retention.ms")
Expand All @@ -366,7 +379,7 @@ def create_topic(self, topic: Topic, timeout: Optional[float] = None):
topic_rep_factor=cfg.replication_factor,
topic_ret_bytes=ret_bytes if ret_bytes is None else int(ret_bytes),
topic_ret_minutes=ret_ms if ret_ms is None else int(ret_ms) // 60000,
cleanup_policy=cfg.extra_config.get("cleanup.policy"),
cleanup_policy=cfg.extra_config.get("cleanup.policy"), # type: ignore[arg-type]
timeout=timeout if timeout is not None else self._timeout,
)
logger.debug(
Expand All @@ -391,15 +404,17 @@ def get_or_create_topic(
try:
return self.get_topic(topic_name=topic.name, timeout=timeout)
except QuixApiRequestFailure as e:
if e.status_code == 404:
# Topic likely does not exist (anything but success 404's; could inspect
# error string, but that creates a dependency on it never changing).
try:
return self.create_topic(topic, timeout=timeout)
except QuixApiRequestFailure:
# Multiple apps likely tried to create at the same time.
# If this fails, it raises with all previous API errors
return self.get_topic(topic_name=topic.name, timeout=timeout)
if e.status_code != 404:
raise

# Topic likely does not exist (anything but success 404's; could inspect
# error string, but that creates a dependency on it never changing).
try:
return self.create_topic(topic, timeout=timeout)
except QuixApiRequestFailure:
# Multiple apps likely tried to create at the same time.
# If this fails, it raises with all previous API errors
return self.get_topic(topic_name=topic.name, timeout=timeout)

def wait_for_topic_ready_statuses(
self,
Expand Down
23 changes: 11 additions & 12 deletions quixstreams/platforms/quix/topic_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Literal
from typing import List, Literal, Optional

from quixstreams.models.topics import Topic, TopicAdmin, TopicManager

Expand All @@ -22,8 +22,8 @@ class QuixTopicManager(TopicManager):

# Default topic params
# Set these to None to use defaults defined in Quix Cloud
default_num_partitions: None = None
default_replication_factor: None = None
default_num_partitions = None
default_replication_factor = None

# Max topic name length for the new topics
_max_topic_name_len = 249
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(
auto_create_topics=auto_create_topics,
)
self._quix_config_builder = quix_config_builder
self._topic_id_to_name = {}
self._topic_id_to_name: dict[str, str] = {}

def _finalize_topic(self, topic: Topic) -> Topic:
"""
Expand All @@ -64,12 +64,11 @@ def _finalize_topic(self, topic: Topic) -> Topic:
Additionally, sets the actual topic configuration since we now have it anyway.
"""
quix_topic_info = self._quix_config_builder.get_or_create_topic(topic)
quix_topic = self._quix_config_builder.convert_topic_response(quix_topic_info)
# allows us to include the configs not included in the API response
quix_topic.config.extra_config = {
**topic.config.extra_config,
**quix_topic.config.extra_config,
}
quix_topic = self._quix_config_builder.convert_topic_response(
quix_topic_info,
extra_config=topic.config.extra_config if topic.config else {},
)

topic_out = topic.__clone__(name=quix_topic.name, config=quix_topic.config)
self._topic_id_to_name[topic_out.name] = quix_topic_info["name"]
return super()._finalize_topic(topic_out)
Expand All @@ -86,7 +85,7 @@ def _create_topics(
def _internal_name(
self,
topic_type: Literal["changelog", "repartition"],
topic_name: str,
topic_name: Optional[str],
suffix: str,
):
"""
Expand All @@ -107,6 +106,6 @@ def _internal_name(
"""
return super()._internal_name(
topic_type,
self._topic_id_to_name[topic_name],
self._topic_id_to_name[topic_name] if topic_name else None,
suffix,
)
7 changes: 7 additions & 0 deletions quixstreams/sources/core/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ def _validate_topics(self) -> None:
target_topic_config,
)

# should never happen
if (
source_topic_config.num_partitions is None
or target_topic_config.num_partitions is None
):
return

if source_topic_config.num_partitions > target_topic_config.num_partitions:
raise ValueError("Source topic has more partitions than destination topic")
elif source_topic_config.num_partitions < target_topic_config.num_partitions:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_quixstreams/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ def factory(workspace_id: Optional[str] = None):
strip_workspace_id_prefix(workspace_id, s) if workspace_id else s
)

cfg_builder.convert_topic_response.side_effect = (
lambda topic: QuixKafkaConfigsBuilder.convert_topic_response(topic)
cfg_builder.convert_topic_response = (
QuixKafkaConfigsBuilder.convert_topic_response
)

# Mock the create API call and return this response.
Expand Down

0 comments on commit 650a205

Please sign in to comment.