From 79d144b8b51b984518af3bb37d627646922f2c44 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 2 Aug 2023 11:02:51 -0700 Subject: [PATCH] Test for issue #360 --- cpp/mrc/tests/test_pipeline.cpp | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/cpp/mrc/tests/test_pipeline.cpp b/cpp/mrc/tests/test_pipeline.cpp index 196ecfbc4..c0db4308f 100644 --- a/cpp/mrc/tests/test_pipeline.cpp +++ b/cpp/mrc/tests/test_pipeline.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -160,6 +161,55 @@ TEST_F(TestPipeline, TwoSegment) LOG(INFO) << "Done" << std::endl; } +TEST_F(TestPipeline, InconsistentPipeline) +{ + // Test to reproduce issue #360 + auto pipeline = mrc::make_pipeline(); + + auto seg_1 = + pipeline->make_segment("seg_1", segment::EgressPorts({"float_port"}), [](segment::IBuilder& seg) { + auto rx_source = seg.make_source("rx_source", [](rxcpp::subscriber s) { + LOG(INFO) << "emit 1"; + s.on_next(1.0F); + LOG(INFO) << "emit 2"; + s.on_next(2.0F); + LOG(INFO) << "emit 3"; + s.on_next(3.0F); + LOG(INFO) << "issuing complete"; + s.on_completed(); + }); + + auto my_float_egress = seg.get_egress("float_port"); + + seg.make_edge(rx_source, my_float_egress); + }); + + auto seg_2 = + pipeline->make_segment("seg_2", segment::IngressPorts({"float_port"}), [&](segment::IBuilder& seg) { + auto my_float_ingress = seg.get_ingress("float_port"); + + auto rx_sink = seg.make_sink("rx_sink", + rxcpp::make_observer_dynamic( + [&](float x) { + DVLOG(1) << x << std::endl; + }, + [&]() { + DVLOG(1) << "Completed" << std::endl; + })); + + seg.make_edge(my_float_ingress, rx_sink); + throw std::runtime_error("Error in initializer"); + }); + + Executor exec(std::move(m_options)); + + exec.register_pipeline(std::move(pipeline)); + + exec.start(); + + EXPECT_THROW(exec.join(), std::runtime_error); +} + /* TEST_F(TestPipeline, TwoSegmentManualTag) {