From f9dbdd70aca97e7bb845802e6ca3569a210255f7 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang Date: Mon, 17 Jun 2024 20:51:09 -0700 Subject: [PATCH] temp commit --- CMakeLists.txt | 2 +- .../node/{ => operators}/dynamic_batcher.hpp | 9 +++- cpp/mrc/tests/test_segment.cpp | 43 +++++++++++++++++++ mrc.code-workspace | 40 +++++++++++++---- 4 files changed, 83 insertions(+), 11 deletions(-) rename cpp/mrc/include/mrc/node/{ => operators}/dynamic_batcher.hpp (94%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1e9931166..7a1accb04 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,7 +122,7 @@ enable_language(CUDA) set(MRC_ROOT_DIR ${CMAKE_CURRENT_SOURCE_DIR}) # Set a default build type if none was specified -rapids_cmake_build_type(Release) +rapids_cmake_build_type(Debug) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) diff --git a/cpp/mrc/include/mrc/node/dynamic_batcher.hpp b/cpp/mrc/include/mrc/node/operators/dynamic_batcher.hpp similarity index 94% rename from cpp/mrc/include/mrc/node/dynamic_batcher.hpp rename to cpp/mrc/include/mrc/node/operators/dynamic_batcher.hpp index 539815423..8f68ac85c 100644 --- a/cpp/mrc/include/mrc/node/dynamic_batcher.hpp +++ b/cpp/mrc/include/mrc/node/operators/dynamic_batcher.hpp @@ -30,6 +30,7 @@ #include "mrc/node/rx_subscribable.hpp" #include "mrc/runnable/runnable.hpp" #include "mrc/utils/type_utils.hpp" +#include "rxcpp/operators/rx-observe_on.hpp" #include #include @@ -39,6 +40,7 @@ #include #include +namespace mrc::node { template class DynamicBatcher : public mrc::node::WritableProvider, public mrc::node::ReadableAcceptor, @@ -81,9 +83,11 @@ class DynamicBatcher : public mrc::node::WritableProvider, s.on_completed(); }); + // DVLOG(1) << "DynamicBatcher: m_duration: " << m_duration.count() << std::endl; + // Buffer the items from the input observable auto buffered_observable = input_observable.buffer_with_time_or_count( - m_duration, m_max_count, rxcpp::observe_on_new_thread()); + m_duration, m_max_count, rxcpp::observe_on_event_loop()); // Subscribe to the buffered observable buffered_observable.subscribe( @@ -122,6 +126,7 @@ class DynamicBatcher : public mrc::node::WritableProvider, } std::stop_source m_stop_source; - size_t m_max_count; + int m_max_count; std::chrono::milliseconds m_duration; }; +} // namespace mrc::node diff --git a/cpp/mrc/tests/test_segment.cpp b/cpp/mrc/tests/test_segment.cpp index bd3b09d78..4792ab28c 100644 --- a/cpp/mrc/tests/test_segment.cpp +++ b/cpp/mrc/tests/test_segment.cpp @@ -20,6 +20,7 @@ #include "mrc/benchmarking/trace_statistics.hpp" #include "mrc/exceptions/runtime_error.hpp" #include "mrc/node/operators/broadcast.hpp" +#include "mrc/node/operators/dynamic_batcher.hpp" #include "mrc/node/rx_node.hpp" #include "mrc/node/rx_sink.hpp" #include "mrc/node/rx_source.hpp" @@ -1122,4 +1123,46 @@ TEST_F(TestSegment, SegmentGetEgressNotEgressError) */ } +TEST_F(TestSegment, SegmentDynamicBatcher) +{ + unsigned int iterations{3}; + std::atomic sink1_results{0}; + float sink2_results{0}; + std::mutex mux; + + auto init = [&](segment::IBuilder& segment) { + auto src = segment.make_source("src", [&](rxcpp::subscriber& s) { + for (size_t i = 0; i < iterations && s.is_subscribed(); i++) + { + s.on_next(1); + s.on_next(2); + s.on_next(3); + } + + s.on_completed(); + }); + + auto dynamic_batcher = segment.construct_object>("dynamic_batcher", 2, std::chrono::milliseconds(100)); + + segment.make_edge(src, dynamic_batcher); + + auto sink = segment.make_sink>("sink", [&](std::vector x) { + DVLOG(1) << "Sink got vector" << std::endl; + for (auto i : x) + { + DVLOG(1) << "Sink got value: " << i << std::endl; + // sink1_results.fetch_add(i, std::memory_order_relaxed); + } + }); + + segment.make_edge(dynamic_batcher, sink); + }; + + auto segdef = Segment::create("dynamic_batcher_test", init); + + auto pipeline = mrc::make_pipeline(); + pipeline->register_segment(std::move(segdef)); + execute_pipeline(std::move(pipeline)); +} + } // namespace mrc diff --git a/mrc.code-workspace b/mrc.code-workspace index 632b0a0e6..82cd91226 100644 --- a/mrc.code-workspace +++ b/mrc.code-workspace @@ -86,25 +86,49 @@ "type": "cppdbg" }, { - "MIMode": "gdb", + "MIMode": "lldb", "args": [], "cwd": "${workspaceFolder}", "environment": [], "externalConsole": false, - "miDebuggerPath": "gdb", - "name": "debug bench_mrc.x", - "preLaunchTask": "C/C++: g++ build active file", - "program": "${workspaceFolder}/build/benchmarks/bench_mrc", + "miDebuggerPath": "lldb", + "name": "debug test_mrc.x with lldb", + // "preLaunchTask": "C/C++: g++ build active file", + "program": "${workspaceFolder}/build/cpp/mrc/tests/test_mrc.x", "request": "launch", "setupCommands": [ { - "description": "Enable pretty-printing for gdb", + "description": "Enable pretty-printing for lldb", "ignoreFailures": true, - "text": "-enable-pretty-printing" + "text": "command script import pretty_printers.py" } ], + "justMyCode": true, "stopAtEntry": false, - "type": "cppdbg" + "type": "lldb" + }, + { + "MIMode": "lldb", + "args": [], + "cwd": "${workspaceFolder}", + "environment": [], + "externalConsole": false, + "miDebuggerPath": "lldb", + "name": "debug TestSegment.SegmentDynamicBatcher with lldb", + // "preLaunchTask": "C/C++: g++ build active file", + "program": "${workspaceFolder}/build/cpp/mrc/tests/test_mrc.x", + "request": "launch", + "setupCommands": [ + { + "description": "Enable pretty-printing for lldb", + "ignoreFailures": true, + "text": "command script import pretty_printers.py" + } + ], + "args": ["--gtest_filter=TestSegment.SegmentDynamicBatcher"], + "justMyCode": true, + "stopAtEntry": false, + "type": "lldb" }, { "MIMode": "gdb",