Skip to content

Commit

Permalink
Prefix names for auto-generated source topics (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniil-quix authored Nov 26, 2024
1 parent 63cb043 commit 8773629
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
8 changes: 7 additions & 1 deletion quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,13 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic
Default: the source default
"""
if not topic:
topic = self._topic_manager.register(source.default_topic())
# Prefix the name of the default topic generated by the Source
# for visibility across all the topics.
default_topic = source.default_topic()
default_topic = default_topic.__clone__(
name=f"source__{default_topic.name}"
)
topic = self._topic_manager.register(default_topic)

self._source_manager.register(
source,
Expand Down
12 changes: 8 additions & 4 deletions tests/test_quixstreams/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2175,9 +2175,7 @@ def wait_finished(self, app, event, timeout=15.0):
app.stop()

def test_run_with_source_success(
self,
app_factory,
executor,
self, app_factory, executor, topic_manager_factory
):
done = Future()
processed_count = 0
Expand All @@ -2192,12 +2190,18 @@ def on_message_processed(topic_, partition, offset):
if processed_count == self.MESSAGES_COUNT:
done.set_result(True)

topic_manager = topic_manager_factory()
app = app_factory(
auto_offset_reset="earliest", on_message_processed=on_message_processed
auto_offset_reset="earliest",
on_message_processed=on_message_processed,
topic_manager=topic_manager,
)
source = DummySource(values=range(self.MESSAGES_COUNT))
sdf = app.dataframe(source=source)

default_topic_name = f"source__{source.default_topic().name}"
assert default_topic_name in topic_manager.topics

executor.submit(_stop_app_on_future, app, done, 10.0)

values = []
Expand Down

0 comments on commit 8773629

Please sign in to comment.