diff --git a/pyproject.toml b/pyproject.toml index 0d8351860..d4047f41d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -125,7 +125,6 @@ ignore_errors = true module = [ "quixstreams.core.*", "quixstreams.dataframe.*", - "quixstreams.platforms.*", "quixstreams.rowproducer.*" ] ignore_errors = true diff --git a/quixstreams/models/topics/manager.py b/quixstreams/models/topics/manager.py index 348134e37..47e014dc2 100644 --- a/quixstreams/models/topics/manager.py +++ b/quixstreams/models/topics/manager.py @@ -38,8 +38,8 @@ class TopicManager: """ # Default topic params - default_num_partitions = 1 - default_replication_factor = 1 + default_num_partitions: Optional[int] = 1 + default_replication_factor: Optional[int] = 1 default_extra_config: dict[str, str] = {} # Max topic name length for the new topics @@ -217,6 +217,9 @@ def _get_source_topic_config( if topic_config is None: raise RuntimeError(f"No configuration can be found for topic {topic_name}") + if topic_config is None: + raise RuntimeError(f"No configuration can be found for topic {topic_name}") + # Copy only certain configuration values from original topic if extras_imports: topic_config.extra_config = { diff --git a/quixstreams/models/topics/topic.py b/quixstreams/models/topics/topic.py index 0a9d22589..d7b887d19 100644 --- a/quixstreams/models/topics/topic.py +++ b/quixstreams/models/topics/topic.py @@ -48,8 +48,8 @@ class TopicConfig: Generally used by Topic and any topic creation procedures. """ - num_partitions: int - replication_factor: int + num_partitions: Optional[int] + replication_factor: Optional[int] extra_config: dict[str, str] = dataclasses.field(default_factory=dict) def as_dict(self): diff --git a/quixstreams/platforms/quix/api.py b/quixstreams/platforms/quix/api.py index f13ef3950..c48e3a5c3 100644 --- a/quixstreams/platforms/quix/api.py +++ b/quixstreams/platforms/quix/api.py @@ -32,7 +32,7 @@ class QuixPortalApiService: def __init__( self, - auth_token: Optional[str] = None, + auth_token: str, portal_api: Optional[str] = None, api_version: Optional[str] = None, default_workspace_id: Optional[str] = None, @@ -40,12 +40,13 @@ def __init__( self._portal_api = ( portal_api or QUIX_ENVIRONMENT.portal_api or DEFAULT_PORTAL_API_URL ) - self._auth_token = auth_token or QUIX_ENVIRONMENT.sdk_token - if not self._auth_token: + if not auth_token: raise MissingConnectionRequirements( f"A Quix Cloud auth token (SDK or PAT) is required; " f"set with environment variable {QUIX_ENVIRONMENT.SDK_TOKEN}" ) + self._auth_token = auth_token + self._default_workspace_id = ( default_workspace_id or QUIX_ENVIRONMENT.workspace_id ) @@ -133,7 +134,7 @@ def get_workspace_certificate( f"/workspaces/{workspace_id}/certificates", timeout=timeout ).content if not content: - return + return None with ZipFile(BytesIO(content)) as z: with z.open("ca.cert") as f: diff --git a/quixstreams/platforms/quix/config.py b/quixstreams/platforms/quix/config.py index 9c42ebdf9..1c7bfad3c 100644 --- a/quixstreams/platforms/quix/config.py +++ b/quixstreams/platforms/quix/config.py @@ -3,7 +3,7 @@ import logging import time from copy import deepcopy -from typing import List, Optional +from typing import Any, List, Optional from requests import HTTPError @@ -122,7 +122,7 @@ def __init__( try: self._workspace_id = workspace_id or self.api.default_workspace_id except UndefinedQuixWorkspaceId: - self._workspace_id = None + self._workspace_id = "" logger.warning( "'workspace_id' argument was not provided nor set with " "'Quix__Workspace__Id' environment; if only one Workspace ID for the " @@ -130,9 +130,9 @@ def __init__( "then that ID will be used. Otherwise, provide a known topic name to " "method 'get_workspace_info(topic)' to obtain desired Workspace ID." ) - self._librdkafka_connect_config = None - self._quix_broker_settings = None - self._workspace_meta = None + self._librdkafka_connect_config: Optional[ConnectionConfig] = None + self._quix_broker_settings: dict[str, Any] = {} + self._workspace_meta: dict[str, Any] = {} self._timeout = timeout self._topic_create_timeout = topic_create_timeout @@ -175,18 +175,22 @@ def librdkafka_extra_config(self) -> dict: } @classmethod - def convert_topic_response(cls, api_response: dict) -> Topic: + def convert_topic_response( + cls, api_response: dict, extra_config: Optional[dict] = None + ) -> Topic: """ Converts a GET or POST ("create") topic API response to a Topic object :param api_response: the dict response from a get or create topic call :return: a corresponding Topic object """ + if extra_config is None: + extra_config = {} + topic_config = api_response["configuration"] - extra_config = { - "retention.ms": topic_config["retentionInMinutes"] * 60 * 1000, - "retention.bytes": topic_config["retentionInBytes"], - } + extra_config["retention.ms"] = topic_config["retentionInMinutes"] * 60 * 1000 + extra_config["retention.bytes"] = topic_config["retentionInBytes"] + # Map value returned by Quix API to Kafka Admin API format if topic_config.get("cleanupPolicy"): cleanup_policy = _quix_cleanup_policy_to_kafka( @@ -254,6 +258,7 @@ def search_for_workspace( for ws in ws_list: if ws["name"] == workspace_name_or_id: return ws + raise def _set_workspace_info(self, workspace_data: dict): ws_data = deepcopy(workspace_data) @@ -280,14 +285,16 @@ def get_workspace_info( :param timeout: response timeout (seconds); Default 30 """ # TODO: more error handling with the wrong combo of ws_id and topic + ws_data: Optional[dict] = None if self._workspace_id: ws_data = self.search_for_workspace( workspace_name_or_id=self._workspace_id, timeout=timeout ) - else: + elif known_workspace_topic: ws_data = self.search_for_topic_workspace( known_workspace_topic, timeout=timeout ) + if not ws_data: raise NoWorkspaceFound( "No workspace was found for the given workspace/auth-token/topic combo" @@ -316,6 +323,8 @@ def search_workspace_for_topic( if t["name"] == topic or t["id"] == topic: return workspace_id + return None + def search_for_topic_workspace( self, topic: str, timeout: Optional[float] = None ) -> Optional[dict]: @@ -345,6 +354,8 @@ def search_for_topic_workspace( ): return ws + return None + def create_topic(self, topic: Topic, timeout: Optional[float] = None): """ The actual API call to create the topic. @@ -353,6 +364,8 @@ def create_topic(self, topic: Topic, timeout: Optional[float] = None): :param timeout: response timeout (seconds); Default 30 """ cfg = topic.config + if cfg is None: + raise RuntimeError("Topic config not set") # settings that must be ints or Nones ret_ms = cfg.extra_config.get("retention.ms") @@ -366,7 +379,7 @@ def create_topic(self, topic: Topic, timeout: Optional[float] = None): topic_rep_factor=cfg.replication_factor, topic_ret_bytes=ret_bytes if ret_bytes is None else int(ret_bytes), topic_ret_minutes=ret_ms if ret_ms is None else int(ret_ms) // 60000, - cleanup_policy=cfg.extra_config.get("cleanup.policy"), + cleanup_policy=cfg.extra_config.get("cleanup.policy"), # type: ignore[arg-type] timeout=timeout if timeout is not None else self._timeout, ) logger.debug( @@ -391,15 +404,17 @@ def get_or_create_topic( try: return self.get_topic(topic_name=topic.name, timeout=timeout) except QuixApiRequestFailure as e: - if e.status_code == 404: - # Topic likely does not exist (anything but success 404's; could inspect - # error string, but that creates a dependency on it never changing). - try: - return self.create_topic(topic, timeout=timeout) - except QuixApiRequestFailure: - # Multiple apps likely tried to create at the same time. - # If this fails, it raises with all previous API errors - return self.get_topic(topic_name=topic.name, timeout=timeout) + if e.status_code != 404: + raise + + # Topic likely does not exist (anything but success 404's; could inspect + # error string, but that creates a dependency on it never changing). + try: + return self.create_topic(topic, timeout=timeout) + except QuixApiRequestFailure: + # Multiple apps likely tried to create at the same time. + # If this fails, it raises with all previous API errors + return self.get_topic(topic_name=topic.name, timeout=timeout) def wait_for_topic_ready_statuses( self, diff --git a/quixstreams/platforms/quix/topic_manager.py b/quixstreams/platforms/quix/topic_manager.py index 753a58685..1f3d76959 100644 --- a/quixstreams/platforms/quix/topic_manager.py +++ b/quixstreams/platforms/quix/topic_manager.py @@ -1,4 +1,4 @@ -from typing import List, Literal +from typing import List, Literal, Optional from quixstreams.models.topics import Topic, TopicAdmin, TopicManager @@ -22,8 +22,8 @@ class QuixTopicManager(TopicManager): # Default topic params # Set these to None to use defaults defined in Quix Cloud - default_num_partitions: None = None - default_replication_factor: None = None + default_num_partitions = None + default_replication_factor = None # Max topic name length for the new topics _max_topic_name_len = 249 @@ -54,7 +54,7 @@ def __init__( auto_create_topics=auto_create_topics, ) self._quix_config_builder = quix_config_builder - self._topic_id_to_name = {} + self._topic_id_to_name: dict[str, str] = {} def _finalize_topic(self, topic: Topic) -> Topic: """ @@ -64,12 +64,11 @@ def _finalize_topic(self, topic: Topic) -> Topic: Additionally, sets the actual topic configuration since we now have it anyway. """ quix_topic_info = self._quix_config_builder.get_or_create_topic(topic) - quix_topic = self._quix_config_builder.convert_topic_response(quix_topic_info) - # allows us to include the configs not included in the API response - quix_topic.config.extra_config = { - **topic.config.extra_config, - **quix_topic.config.extra_config, - } + quix_topic = self._quix_config_builder.convert_topic_response( + quix_topic_info, + extra_config=topic.config.extra_config if topic.config else {}, + ) + topic_out = topic.__clone__(name=quix_topic.name, config=quix_topic.config) self._topic_id_to_name[topic_out.name] = quix_topic_info["name"] return super()._finalize_topic(topic_out) @@ -86,7 +85,7 @@ def _create_topics( def _internal_name( self, topic_type: Literal["changelog", "repartition"], - topic_name: str, + topic_name: Optional[str], suffix: str, ): """ @@ -107,6 +106,6 @@ def _internal_name( """ return super()._internal_name( topic_type, - self._topic_id_to_name[topic_name], + self._topic_id_to_name[topic_name] if topic_name else None, suffix, ) diff --git a/quixstreams/sources/core/kafka/kafka.py b/quixstreams/sources/core/kafka/kafka.py index 398337b12..ed7b3d617 100644 --- a/quixstreams/sources/core/kafka/kafka.py +++ b/quixstreams/sources/core/kafka/kafka.py @@ -302,6 +302,13 @@ def _validate_topics(self) -> None: target_topic_config, ) + # should never happen + if ( + source_topic_config.num_partitions is None + or target_topic_config.num_partitions is None + ): + return + if source_topic_config.num_partitions > target_topic_config.num_partitions: raise ValueError("Source topic has more partitions than destination topic") elif source_topic_config.num_partitions < target_topic_config.num_partitions: diff --git a/tests/test_quixstreams/fixtures.py b/tests/test_quixstreams/fixtures.py index d41550645..1cc941c31 100644 --- a/tests/test_quixstreams/fixtures.py +++ b/tests/test_quixstreams/fixtures.py @@ -378,8 +378,8 @@ def factory(workspace_id: Optional[str] = None): strip_workspace_id_prefix(workspace_id, s) if workspace_id else s ) - cfg_builder.convert_topic_response.side_effect = ( - lambda topic: QuixKafkaConfigsBuilder.convert_topic_response(topic) + cfg_builder.convert_topic_response = ( + QuixKafkaConfigsBuilder.convert_topic_response ) # Mock the create API call and return this response.