diff --git a/quixstreams/app.py b/quixstreams/app.py index 0e639c4ef..a325619b3 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -665,7 +665,13 @@ def add_source(self, source: BaseSource, topic: Optional[Topic] = None) -> Topic Default: the source default """ if not topic: - topic = self._topic_manager.register(source.default_topic()) + # Prefix the name of the default topic generated by the Source + # for visibility across all the topics. + default_topic = source.default_topic() + default_topic = default_topic.__clone__( + name=f"source__{default_topic.name}" + ) + topic = self._topic_manager.register(default_topic) self._source_manager.register( source, diff --git a/tests/test_quixstreams/test_app.py b/tests/test_quixstreams/test_app.py index a0f89a76d..9187f691a 100644 --- a/tests/test_quixstreams/test_app.py +++ b/tests/test_quixstreams/test_app.py @@ -2175,9 +2175,7 @@ def wait_finished(self, app, event, timeout=15.0): app.stop() def test_run_with_source_success( - self, - app_factory, - executor, + self, app_factory, executor, topic_manager_factory ): done = Future() processed_count = 0 @@ -2192,12 +2190,18 @@ def on_message_processed(topic_, partition, offset): if processed_count == self.MESSAGES_COUNT: done.set_result(True) + topic_manager = topic_manager_factory() app = app_factory( - auto_offset_reset="earliest", on_message_processed=on_message_processed + auto_offset_reset="earliest", + on_message_processed=on_message_processed, + topic_manager=topic_manager, ) source = DummySource(values=range(self.MESSAGES_COUNT)) sdf = app.dataframe(source=source) + default_topic_name = f"source__{source.default_topic().name}" + assert default_topic_name in topic_manager.topics + executor.submit(_stop_app_on_future, app, done, 10.0) values = []