Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: configuring broker_address take priority over quix config #384

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
or a ConnectionConfig object if authentication is required.
Either this OR `quix_sdk_token` must be set to use `Application` (not both).
Takes priority over quix auto-configuration.
Linked Environment Variable: `Quix__Broker__Address`.
Default: `None`
:param quix_sdk_token: If using the Quix Cloud, the SDK token to connect with.
Expand Down Expand Up @@ -197,27 +198,32 @@ def __init__(
"Quix__Consumer_Group", "quixstreams-default"
)

if quix_config_builder:
quix_app_source = "Quix Config Builder"
if quix_config_builder and quix_sdk_token:
raise warnings.warn(
"'quix_config_builder' is not necessary when an SDK token is defined; "
"we recommend letting the Application generate it automatically"
)

if quix_sdk_token and not quix_config_builder:
quix_app_source = "Quix SDK Token"
quix_config_builder = QuixKafkaConfigsBuilder(quix_sdk_token=quix_sdk_token)
if broker_address:
# If broker_address is passed to the app it takes priority over any quix configuration
self._is_quix_app = False
topic_manager_factory = TopicManager
if isinstance(broker_address, str):
broker_address = ConnectionConfig(bootstrap_servers=broker_address)
else:
self._is_quix_app = True

if quix_config_builder:
quix_app_source = "Quix Config Builder"
if quix_sdk_token:
warnings.warn(
"'quix_config_builder' is not necessary when an SDK token is defined; "
"we recommend letting the Application generate it automatically"
)
elif quix_sdk_token:
quix_app_source = "Quix SDK Token"
quix_config_builder = QuixKafkaConfigsBuilder(
quix_sdk_token=quix_sdk_token
)
else:
raise ValueError(
'Either "broker_address" or "quix_sdk_token" must be provided'
)

if broker_address and quix_config_builder:
raise ValueError(
'Cannot provide both "broker_address" and "quix_sdk_token"'
)
elif not (broker_address or quix_config_builder):
raise ValueError(
'Either "broker_address" or "quix_sdk_token" must be provided'
)
elif quix_config_builder:
# SDK Token or QuixKafkaConfigsBuilder were provided
logger.info(
f"{quix_app_source} detected; "
Expand All @@ -234,13 +240,6 @@ def __init__(
consumer_group = quix_app_config.consumer_group
consumer_extra_config.update(quix_app_config.librdkafka_extra_config)
producer_extra_config.update(quix_app_config.librdkafka_extra_config)
else:
# Only broker address is provided
topic_manager_factory = TopicManager
if isinstance(broker_address, str):
broker_address = ConnectionConfig(bootstrap_servers=broker_address)

self._is_quix_app = bool(quix_config_builder)

self._broker_address = broker_address
self._consumer_group = consumer_group
Expand Down Expand Up @@ -301,6 +300,10 @@ def __init__(
state_manager=self._state_manager,
)

@property
def is_quix_app(self):
return self._is_quix_app

@classmethod
def Quix(
cls,
Expand Down Expand Up @@ -699,7 +702,7 @@ def run(
f'auto_offset_reset="{self._auto_offset_reset}" '
f"commit_interval={self._commit_interval}s"
)
if self._is_quix_app:
if self.is_quix_app:
self._quix_runtime_init()

self._setup_topics()
Expand Down
18 changes: 9 additions & 9 deletions tests/test_quixstreams/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,12 +748,13 @@ def get_cfg_builder(quix_sdk_token):
) as consumer_init_mock, patch(
"quixstreams.app.RowProducer"
) as producer_init_mock:
Application(
app = Application(
consumer_group=consumer_group,
quix_sdk_token=quix_sdk_token,
consumer_extra_config=extra_config,
producer_extra_config=extra_config,
)
assert app.is_quix_app

# Check if items from the Quix config have been passed
# to the low-level configs of producer and consumer
Expand Down Expand Up @@ -886,14 +887,13 @@ def get_cfg_builder(quix_sdk_token):
assert consumer_call_kwargs["consumer_group"] == expected_workspace_cgroup
assert consumer_call_kwargs["extra_config"] == expected_consumer_extra_config

def test_init_with_broker_id_raises(self):
with pytest.raises(ValueError) as e_info:
Application(
broker_address="address",
quix_config_builder=create_autospec(QuixKafkaConfigsBuilder),
)
error_str = 'Cannot provide both "broker_address" and "quix_sdk_token"'
assert error_str in e_info.value.args
def test_init_with_broker_id_dont_raises(self):
app = Application(
broker_address="address",
quix_config_builder=create_autospec(QuixKafkaConfigsBuilder),
)

assert not app.is_quix_app

def test_topic_name_and_config(self, quix_app_factory):
"""
Expand Down
Loading