diff --git a/quixstreams/app.py b/quixstreams/app.py index 5cc1b050c..8e42f880f 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -120,6 +120,7 @@ def __init__( Accepts string with Kafka broker host and port formatted as `:`, or a ConnectionConfig object if authentication is required. Either this OR `quix_sdk_token` must be set to use `Application` (not both). + Takes priority over quix auto-configuration. Linked Environment Variable: `Quix__Broker__Address`. Default: `None` :param quix_sdk_token: If using the Quix Cloud, the SDK token to connect with. @@ -197,27 +198,32 @@ def __init__( "Quix__Consumer_Group", "quixstreams-default" ) - if quix_config_builder: - quix_app_source = "Quix Config Builder" - if quix_config_builder and quix_sdk_token: - raise warnings.warn( - "'quix_config_builder' is not necessary when an SDK token is defined; " - "we recommend letting the Application generate it automatically" - ) - - if quix_sdk_token and not quix_config_builder: - quix_app_source = "Quix SDK Token" - quix_config_builder = QuixKafkaConfigsBuilder(quix_sdk_token=quix_sdk_token) + if broker_address: + # If broker_address is passed to the app it takes priority over any quix configuration + self._is_quix_app = False + topic_manager_factory = TopicManager + if isinstance(broker_address, str): + broker_address = ConnectionConfig(bootstrap_servers=broker_address) + else: + self._is_quix_app = True + + if quix_config_builder: + quix_app_source = "Quix Config Builder" + if quix_sdk_token: + warnings.warn( + "'quix_config_builder' is not necessary when an SDK token is defined; " + "we recommend letting the Application generate it automatically" + ) + elif quix_sdk_token: + quix_app_source = "Quix SDK Token" + quix_config_builder = QuixKafkaConfigsBuilder( + quix_sdk_token=quix_sdk_token + ) + else: + raise ValueError( + 'Either "broker_address" or "quix_sdk_token" must be provided' + ) - if broker_address and quix_config_builder: - raise ValueError( - 'Cannot provide both "broker_address" and "quix_sdk_token"' - ) - elif not (broker_address or quix_config_builder): - raise ValueError( - 'Either "broker_address" or "quix_sdk_token" must be provided' - ) - elif quix_config_builder: # SDK Token or QuixKafkaConfigsBuilder were provided logger.info( f"{quix_app_source} detected; " @@ -234,13 +240,6 @@ def __init__( consumer_group = quix_app_config.consumer_group consumer_extra_config.update(quix_app_config.librdkafka_extra_config) producer_extra_config.update(quix_app_config.librdkafka_extra_config) - else: - # Only broker address is provided - topic_manager_factory = TopicManager - if isinstance(broker_address, str): - broker_address = ConnectionConfig(bootstrap_servers=broker_address) - - self._is_quix_app = bool(quix_config_builder) self._broker_address = broker_address self._consumer_group = consumer_group @@ -301,6 +300,10 @@ def __init__( state_manager=self._state_manager, ) + @property + def is_quix_app(self): + return self._is_quix_app + @classmethod def Quix( cls, @@ -699,7 +702,7 @@ def run( f'auto_offset_reset="{self._auto_offset_reset}" ' f"commit_interval={self._commit_interval}s" ) - if self._is_quix_app: + if self.is_quix_app: self._quix_runtime_init() self._setup_topics() diff --git a/tests/test_quixstreams/test_app.py b/tests/test_quixstreams/test_app.py index cdafe8267..c49b015a6 100644 --- a/tests/test_quixstreams/test_app.py +++ b/tests/test_quixstreams/test_app.py @@ -748,12 +748,13 @@ def get_cfg_builder(quix_sdk_token): ) as consumer_init_mock, patch( "quixstreams.app.RowProducer" ) as producer_init_mock: - Application( + app = Application( consumer_group=consumer_group, quix_sdk_token=quix_sdk_token, consumer_extra_config=extra_config, producer_extra_config=extra_config, ) + assert app.is_quix_app # Check if items from the Quix config have been passed # to the low-level configs of producer and consumer @@ -886,14 +887,13 @@ def get_cfg_builder(quix_sdk_token): assert consumer_call_kwargs["consumer_group"] == expected_workspace_cgroup assert consumer_call_kwargs["extra_config"] == expected_consumer_extra_config - def test_init_with_broker_id_raises(self): - with pytest.raises(ValueError) as e_info: - Application( - broker_address="address", - quix_config_builder=create_autospec(QuixKafkaConfigsBuilder), - ) - error_str = 'Cannot provide both "broker_address" and "quix_sdk_token"' - assert error_str in e_info.value.args + def test_init_with_broker_id_dont_raises(self): + app = Application( + broker_address="address", + quix_config_builder=create_autospec(QuixKafkaConfigsBuilder), + ) + + assert not app.is_quix_app def test_topic_name_and_config(self, quix_app_factory): """