Skip to content

Commit

Permalink
Bug: configuring broker_address take priority over quix config
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Jun 14, 2024
1 parent 6c6305d commit 6f1d84e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 37 deletions.
59 changes: 31 additions & 28 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
or a ConnectionConfig object if authentication is required.
Either this OR `quix_sdk_token` must be set to use `Application` (not both).
Takes priority over quix auto-configuration.
Linked Environment Variable: `Quix__Broker__Address`.
Default: `None`
:param quix_sdk_token: If using the Quix Cloud, the SDK token to connect with.
Expand Down Expand Up @@ -197,27 +198,32 @@ def __init__(
"Quix__Consumer_Group", "quixstreams-default"
)

if quix_config_builder:
quix_app_source = "Quix Config Builder"
if quix_config_builder and quix_sdk_token:
raise warnings.warn(
"'quix_config_builder' is not necessary when an SDK token is defined; "
"we recommend letting the Application generate it automatically"
)

if quix_sdk_token and not quix_config_builder:
quix_app_source = "Quix SDK Token"
quix_config_builder = QuixKafkaConfigsBuilder(quix_sdk_token=quix_sdk_token)
if broker_address:
# If broker_address is passed to the app it takes priority over any quix configuration
self._is_quix_app = False
topic_manager_factory = TopicManager
if isinstance(broker_address, str):
broker_address = ConnectionConfig(bootstrap_servers=broker_address)
else:
self._is_quix_app = True

if quix_config_builder:
quix_app_source = "Quix Config Builder"
if quix_sdk_token:
warnings.warn(
"'quix_config_builder' is not necessary when an SDK token is defined; "
"we recommend letting the Application generate it automatically"
)
elif quix_sdk_token:
quix_app_source = "Quix SDK Token"
quix_config_builder = QuixKafkaConfigsBuilder(
quix_sdk_token=quix_sdk_token
)
else:
raise ValueError(
'Either "broker_address" or "quix_sdk_token" must be provided'
)

if broker_address and quix_config_builder:
raise ValueError(
'Cannot provide both "broker_address" and "quix_sdk_token"'
)
elif not (broker_address or quix_config_builder):
raise ValueError(
'Either "broker_address" or "quix_sdk_token" must be provided'
)
elif quix_config_builder:
# SDK Token or QuixKafkaConfigsBuilder were provided
logger.info(
f"{quix_app_source} detected; "
Expand All @@ -234,13 +240,6 @@ def __init__(
consumer_group = quix_app_config.consumer_group
consumer_extra_config.update(quix_app_config.librdkafka_extra_config)
producer_extra_config.update(quix_app_config.librdkafka_extra_config)
else:
# Only broker address is provided
topic_manager_factory = TopicManager
if isinstance(broker_address, str):
broker_address = ConnectionConfig(bootstrap_servers=broker_address)

self._is_quix_app = bool(quix_config_builder)

self._broker_address = broker_address
self._consumer_group = consumer_group
Expand Down Expand Up @@ -301,6 +300,10 @@ def __init__(
state_manager=self._state_manager,
)

@property
def is_quix_app(self):
return self._is_quix_app

@classmethod
def Quix(
cls,
Expand Down Expand Up @@ -699,7 +702,7 @@ def run(
f'auto_offset_reset="{self._auto_offset_reset}" '
f"commit_interval={self._commit_interval}s"
)
if self._is_quix_app:
if self.is_quix_app:
self._quix_runtime_init()

self._setup_topics()
Expand Down
18 changes: 9 additions & 9 deletions tests/test_quixstreams/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,12 +748,13 @@ def get_cfg_builder(quix_sdk_token):
) as consumer_init_mock, patch(
"quixstreams.app.RowProducer"
) as producer_init_mock:
Application(
app = Application(
consumer_group=consumer_group,
quix_sdk_token=quix_sdk_token,
consumer_extra_config=extra_config,
producer_extra_config=extra_config,
)
assert app.is_quix_app

# Check if items from the Quix config have been passed
# to the low-level configs of producer and consumer
Expand Down Expand Up @@ -886,14 +887,13 @@ def get_cfg_builder(quix_sdk_token):
assert consumer_call_kwargs["consumer_group"] == expected_workspace_cgroup
assert consumer_call_kwargs["extra_config"] == expected_consumer_extra_config

def test_init_with_broker_id_raises(self):
with pytest.raises(ValueError) as e_info:
Application(
broker_address="address",
quix_config_builder=create_autospec(QuixKafkaConfigsBuilder),
)
error_str = 'Cannot provide both "broker_address" and "quix_sdk_token"'
assert error_str in e_info.value.args
def test_init_with_broker_id_dont_raises(self):
app = Application(
broker_address="address",
quix_config_builder=create_autospec(QuixKafkaConfigsBuilder),
)

assert not app.is_quix_app

def test_topic_name_and_config(self, quix_app_factory):
"""
Expand Down

0 comments on commit 6f1d84e

Please sign in to comment.