diff --git a/mediapipe/framework/BUILD b/mediapipe/framework/BUILD index dc717d0e21..64112cd255 100644 --- a/mediapipe/framework/BUILD +++ b/mediapipe/framework/BUILD @@ -334,6 +334,7 @@ cc_library( ":delegating_executor", ":executor", ":graph_output_stream", + ":graph_runtime_info_cc_proto", ":graph_service", ":graph_service_manager", ":input_stream_manager", @@ -359,8 +360,8 @@ cc_library( ":timestamp", ":validated_graph_config", ":vlog_overrides", + "//mediapipe/framework/deps:clock", "//mediapipe/framework/port:core_proto", - "//mediapipe/framework/port:integral_types", "//mediapipe/framework/port:logging", "//mediapipe/framework/port:map_util", "//mediapipe/framework/port:ret_check", @@ -387,6 +388,7 @@ cc_library( "@com_google_absl//absl/strings", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/synchronization", + "@com_google_absl//absl/time", ], ) @@ -425,6 +427,7 @@ cc_library( ":calculator_context_manager", ":calculator_state", ":counter_factory", + ":graph_runtime_info_cc_proto", ":graph_service_manager", ":input_side_packet_handler", ":input_stream_handler", @@ -441,6 +444,7 @@ cc_library( ":stream_handler_cc_proto", ":timestamp", ":validated_graph_config", + "//mediapipe/framework/deps:clock", "//mediapipe/framework/port:core_proto", "//mediapipe/framework/port:integral_types", "//mediapipe/framework/port:logging", @@ -454,12 +458,14 @@ cc_library( "//mediapipe/framework/tool:tag_map", "//mediapipe/framework/tool:validate_name", "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/cleanup", "@com_google_absl//absl/log:absl_check", "@com_google_absl//absl/log:absl_log", "@com_google_absl//absl/memory", "@com_google_absl//absl/status", "@com_google_absl//absl/strings", "@com_google_absl//absl/synchronization", + "@com_google_absl//absl/time", ], ) @@ -647,6 +653,12 @@ cc_library( ], ) +mediapipe_proto_library( + name = "graph_runtime_info_proto", + srcs = ["graph_runtime_info.proto"], + visibility = ["//visibility:public"], +) + cc_library( name = "graph_service", hdrs = ["graph_service.h"], @@ -785,6 +797,7 @@ cc_library( "//mediapipe/framework/port:ret_check", "//mediapipe/framework/port:status", "//mediapipe/framework/tool:tag_map", + "@com_google_absl//absl/log", "@com_google_absl//absl/log:absl_check", "@com_google_absl//absl/log:absl_log", "@com_google_absl//absl/strings", @@ -1620,6 +1633,7 @@ cc_test( "//mediapipe/calculators/core:counting_source_calculator", "//mediapipe/calculators/core:mux_calculator", "//mediapipe/calculators/core:pass_through_calculator", + "//mediapipe/framework/deps:clock", "//mediapipe/framework/port:gtest_main", "//mediapipe/framework/port:parse_text_proto", "//mediapipe/framework/port:ret_check", diff --git a/mediapipe/framework/calculator_graph.cc b/mediapipe/framework/calculator_graph.cc index 3fe9f8d933..6cacd8117b 100644 --- a/mediapipe/framework/calculator_graph.cc +++ b/mediapipe/framework/calculator_graph.cc @@ -37,10 +37,12 @@ #include "absl/strings/string_view.h" #include "absl/strings/substitute.h" #include "absl/synchronization/mutex.h" +#include "absl/time/time.h" #include "mediapipe/framework/calculator.pb.h" #include "mediapipe/framework/calculator_base.h" #include "mediapipe/framework/counter_factory.h" #include "mediapipe/framework/delegating_executor.h" +#include "mediapipe/framework/deps/clock.h" #include "mediapipe/framework/executor.h" #include "mediapipe/framework/graph_output_stream.h" #include "mediapipe/framework/graph_service_manager.h" @@ -914,6 +916,17 @@ absl::Status CalculatorGraph::WaitForObservedOutput() { return scheduler_.WaitForObservedOutput(); } +absl::StatusOr CalculatorGraph::GetGraphRuntimeInfo() { + RET_CHECK(initialized_); + GraphRuntimeInfo info; + for (const auto& node : nodes_) { + *info.add_calculator_infos() = node->GetStreamMonitoringInfo(); + } + const absl::Time time_now = mediapipe::Clock::RealClock()->TimeNow(); + info.set_capture_time_unix_us(absl::ToUnixMicros(time_now)); + return info; +} + absl::Status CalculatorGraph::AddPacketToInputStream( absl::string_view stream_name, const Packet& packet) { return AddPacketToInputStreamInternal(stream_name, packet); diff --git a/mediapipe/framework/calculator_graph.h b/mediapipe/framework/calculator_graph.h index 19032fbc2f..a128d1b551 100644 --- a/mediapipe/framework/calculator_graph.h +++ b/mediapipe/framework/calculator_graph.h @@ -40,6 +40,7 @@ #include "mediapipe/framework/counter_factory.h" #include "mediapipe/framework/executor.h" #include "mediapipe/framework/graph_output_stream.h" +#include "mediapipe/framework/graph_runtime_info.pb.h" #include "mediapipe/framework/graph_service.h" #include "mediapipe/framework/graph_service_manager.h" #include "mediapipe/framework/mediapipe_profiling.h" @@ -257,6 +258,11 @@ class CalculatorGraph { // Quick non-locking means of checking if the graph has encountered an error. bool HasError() const { return has_error_; } + // Returns debugging information about the graph transient state, including + // information about all input streams and their timestamp bounds. This method + // is thread safe and can be called from any thread. + absl::StatusOr GetGraphRuntimeInfo(); + // Add a Packet to a graph input stream based on the graph input stream add // mode. If the mode is ADD_IF_NOT_FULL, the packet will not be added if any // queue exceeds max_queue_size specified by the graph config and will return diff --git a/mediapipe/framework/calculator_graph_test.cc b/mediapipe/framework/calculator_graph_test.cc index 745271e53b..ef59bf6388 100644 --- a/mediapipe/framework/calculator_graph_test.cc +++ b/mediapipe/framework/calculator_graph_test.cc @@ -43,6 +43,7 @@ #include "mediapipe/framework/calculator_framework.h" #include "mediapipe/framework/collection_item_id.h" #include "mediapipe/framework/counter_factory.h" +#include "mediapipe/framework/deps/clock.h" #include "mediapipe/framework/executor.h" #include "mediapipe/framework/input_stream_handler.h" #include "mediapipe/framework/lifetime_tracker.h" @@ -70,7 +71,6 @@ #include "mediapipe/gpu/gpu_service.h" namespace mediapipe { - namespace { constexpr char kCounter2Tag[] = "COUNTER2"; @@ -83,8 +83,9 @@ constexpr char kOutputTag[] = "OUTPUT"; constexpr char kInputTag[] = "INPUT"; constexpr char kSelectTag[] = "SELECT"; -using testing::ElementsAre; -using testing::HasSubstr; +using ::mediapipe::Clock; +using ::testing::ElementsAre; +using ::testing::HasSubstr; // Pass packets through. Note that it calls SetOffset() in Process() // instead of Open(). diff --git a/mediapipe/framework/calculator_node.cc b/mediapipe/framework/calculator_node.cc index 6767cb874d..2a7d56c934 100644 --- a/mediapipe/framework/calculator_node.cc +++ b/mediapipe/framework/calculator_node.cc @@ -17,22 +17,28 @@ #include #include #include +#include #include #include +#include +#include "absl/cleanup/cleanup.h" #include "absl/log/absl_check.h" #include "absl/log/absl_log.h" #include "absl/memory/memory.h" #include "absl/status/status.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_join.h" +#include "absl/strings/str_split.h" #include "absl/strings/string_view.h" #include "absl/strings/substitute.h" #include "absl/synchronization/mutex.h" +#include "absl/time/time.h" #include "mediapipe/framework/calculator.pb.h" #include "mediapipe/framework/calculator_base.h" #include "mediapipe/framework/calculator_state.h" #include "mediapipe/framework/counter_factory.h" +#include "mediapipe/framework/deps/clock.h" #include "mediapipe/framework/graph_service_manager.h" #include "mediapipe/framework/input_stream_manager.h" #include "mediapipe/framework/mediapipe_profiling.h" @@ -54,6 +60,7 @@ namespace mediapipe { +using ::mediapipe::Clock; namespace { const PacketType* GetPacketType(const PacketTypeSet& packet_type_set, @@ -116,7 +123,11 @@ std::unique_ptr RemoveOmittedPacketTypes( } // namespace -CalculatorNode::CalculatorNode() {} +CalculatorNode::CalculatorNode() { + absl::Time now = Clock::RealClock()->TimeNow(); + last_process_start_ts_ = now; + last_process_finish_ts_ = now; +} Timestamp CalculatorNode::SourceProcessOrder( const CalculatorContext* cc) const { @@ -218,6 +229,29 @@ absl::Status CalculatorNode::Initialize( return InitializeInputStreams(input_stream_managers, output_stream_managers); } +CalculatorRuntimeInfo CalculatorNode::GetStreamMonitoringInfo() const { + CalculatorRuntimeInfo calulator_info; + calulator_info.set_calculator_name(DebugName()); + { + absl::MutexLock lock(&runtime_info_mutex_); + calulator_info.set_last_process_start_unix_us( + absl::ToUnixMicros(last_process_start_ts_)); + calulator_info.set_last_process_finish_unix_us( + absl::ToUnixMicros(last_process_finish_ts_)); + } + const auto monitoring_info = input_stream_handler_->GetMonitoringInfo(); + for (const auto& [stream_name, queue_size, num_packets_added, + minimum_timestamp_or_bound] : monitoring_info) { + auto* stream_info = calulator_info.add_input_stream_infos(); + stream_info->set_stream_name(stream_name); + stream_info->set_queue_size(queue_size); + stream_info->set_number_of_packets_added(num_packets_added); + stream_info->set_minimum_timestamp_or_bound( + minimum_timestamp_or_bound.Value()); + } + return calulator_info; +} + absl::Status CalculatorNode::InitializeOutputSidePackets( const PacketTypeSet& output_side_packet_types, OutputSidePacketImpl* output_side_packets) { @@ -669,14 +703,14 @@ void CalculatorNode::SchedulingLoop() { max_allowance = max_in_flight_ - current_in_flight_; } while (true) { - Timestamp input_bound; - // input_bound is set to a meaningful value iff the latest readiness of the - // node is kNotReady when ScheduleInvocations() returns. - input_stream_handler_->ScheduleInvocations(max_allowance, &input_bound); - if (input_bound != Timestamp::Unset()) { + // last_timestamp_bound_ is set to a meaningful value iff the latest + // readiness of the node is kNotReady when ScheduleInvocations() returns. + input_stream_handler_->ScheduleInvocations(max_allowance, + &last_timestamp_bound_); + if (last_timestamp_bound_ != Timestamp::Unset()) { // Updates the minimum timestamp for which a new packet could possibly // arrive. - output_stream_handler_->UpdateTaskTimestampBound(input_bound); + output_stream_handler_->UpdateTaskTimestampBound(last_timestamp_bound_); } { @@ -805,6 +839,18 @@ std::string CalculatorNode::DebugName() const { // TODO: Split this function. absl::Status CalculatorNode::ProcessNode( CalculatorContext* calculator_context) { + // Update calculator runtime info. + { + absl::MutexLock lock(&runtime_info_mutex_); + last_process_start_ts_ = Clock::RealClock()->TimeNow(); + } + absl::Cleanup last_process_finish_ts_cleanup([this]() { + { + absl::MutexLock lock(&runtime_info_mutex_); + last_process_finish_ts_ = Clock::RealClock()->TimeNow(); + } + }); + if (IsSource()) { // This is a source Calculator. if (Closed()) { diff --git a/mediapipe/framework/calculator_node.h b/mediapipe/framework/calculator_node.h index 1340461169..26b644dec1 100644 --- a/mediapipe/framework/calculator_node.h +++ b/mediapipe/framework/calculator_node.h @@ -28,13 +28,16 @@ #include #include "absl/base/macros.h" +#include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/synchronization/mutex.h" +#include "absl/time/time.h" #include "mediapipe/framework/calculator.pb.h" #include "mediapipe/framework/calculator_base.h" #include "mediapipe/framework/calculator_context.h" #include "mediapipe/framework/calculator_context_manager.h" #include "mediapipe/framework/calculator_state.h" +#include "mediapipe/framework/graph_runtime_info.pb.h" #include "mediapipe/framework/graph_service_manager.h" #include "mediapipe/framework/input_side_packet_handler.h" #include "mediapipe/framework/input_stream_handler.h" @@ -239,6 +242,14 @@ class CalculatorNode { return node_type_info_->Contract(); } + // Returns the last timestamp bound used to schedule this node. + Timestamp GetLastTimestampBound() const { return last_timestamp_bound_; } + + // Returns the stream monitoring info for this node consisting of a vector of + // tuples of input stream name, queue size, number of packets added, and + // minimum timestamp or bound. + CalculatorRuntimeInfo GetStreamMonitoringInfo() const; + private: // Sets up the output side packets from the main flat array. absl::Status InitializeOutputSidePackets( @@ -376,6 +387,16 @@ class CalculatorNode { const ValidatedGraphConfig* validated_graph_ = nullptr; const NodeTypeInfo* node_type_info_ = nullptr; + + // Keeps track of the latest timestamp bound used to schedule this node. + Timestamp last_timestamp_bound_ = Timestamp::Unset(); + + // Keeps track of the runtime info for this node. + mutable absl::Mutex runtime_info_mutex_; + absl::Time last_process_start_ts_ ABSL_GUARDED_BY(runtime_info_mutex_) = + absl::InfinitePast(); + absl::Time last_process_finish_ts_ ABSL_GUARDED_BY(runtime_info_mutex_) = + absl::InfinitePast(); }; } // namespace mediapipe diff --git a/mediapipe/framework/graph_runtime_info.proto b/mediapipe/framework/graph_runtime_info.proto new file mode 100644 index 0000000000..1488a33f7d --- /dev/null +++ b/mediapipe/framework/graph_runtime_info.proto @@ -0,0 +1,62 @@ +// Copyright 2024 The MediaPipe Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package mediapipe; + +option java_package = "com.google.mediapipe.proto"; +option java_outer_classname = "GraphRuntimeInfoProto"; + +// The runtime info for an input stream. +message StreamRuntimeInfo { + // The name of the stream in the format "TAG:index:stream_name" + string stream_name = 1; + + // The number of packets in the queue. + int32 queue_size = 2; + + // The total number of packets added to the queue. + int32 number_of_packets_added = 3; + + // The minimum timestamp or timestanp bound of the stream. + int64 minimum_timestamp_or_bound = 4; +} + +// The runtime info for a calculator. +message CalculatorRuntimeInfo { + // The name of the calculator. + string calculator_name = 1; + + // The last time when the Calculator::Process was started. + int64 last_process_start_unix_us = 2; + + // The last time when the Calculator::Process was finished. + int64 last_process_finish_unix_us = 3; + + // The timestamp bound of the calculator. + int64 timestamp_bound = 4; + + // The runtime info for each input stream of the calculator. + repeated StreamRuntimeInfo input_stream_infos = 5; +} + +// The runtime info for the whole graph. +message GraphRuntimeInfo { + // The time when the runtime info was captured. + int64 capture_time_unix_us = 1; + + // The runtime info for each calculator in the graph. + repeated CalculatorRuntimeInfo calculator_infos = 2; +} diff --git a/mediapipe/framework/input_stream_handler.cc b/mediapipe/framework/input_stream_handler.cc index 6f44568d82..ee4a86adf2 100644 --- a/mediapipe/framework/input_stream_handler.cc +++ b/mediapipe/framework/input_stream_handler.cc @@ -14,11 +14,15 @@ #include "mediapipe/framework/input_stream_handler.h" +#include +#include #include +#include #include #include "absl/log/absl_check.h" #include "absl/log/absl_log.h" +#include "absl/log/log.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_join.h" #include "absl/strings/substitute.h" @@ -27,8 +31,41 @@ #include "mediapipe/framework/port/ret_check.h" namespace mediapipe { + +namespace { using SyncSet = InputStreamHandler::SyncSet; +// Helper class to vlog the streams with missing packets during FillInputSet +// calls. +class FillInputSetLogger { + public: + FillInputSetLogger(const std::string& node_name, Timestamp timestamp) + : node_name_(node_name), timestamp_(timestamp) {} + ~FillInputSetLogger() { OutputLogs(); } + + void AddMissingPacketStreamName(const std::string& stream_name) { + missing_streams_.push_back(stream_name); + } + + private: + void OutputLogs() const { + if (!missing_streams_.empty()) { + VLOG(1) << absl::StrCat( + node_name_, ": Filled input set at ts: ", timestamp_.DebugString(), + " with MISSING packets in input streams: ", + absl::StrJoin(missing_streams_, ", "), "."); + } else { + VLOG(1) << absl::StrCat( + node_name_, ": Filled input set at ts: ", timestamp_.DebugString()); + } + } + + const std::string node_name_; + const Timestamp timestamp_; + std::vector missing_streams_; +}; +} // namespace + absl::Status InputStreamHandler::InitializeInputStreamManagers( InputStreamManager* flat_input_stream_managers) { for (CollectionItemId id = input_stream_managers_.BeginId(); @@ -60,13 +97,15 @@ std::vector> InputStreamHandler::GetMonitoringInfo() { std::vector> monitoring_info_vector; - for (auto& stream : input_stream_managers_) { + for (CollectionItemId id = input_stream_managers_.BeginId(); + id < input_stream_managers_.EndId(); ++id) { + const auto& stream = input_stream_managers_.Get(id); if (!stream) { continue; } monitoring_info_vector.emplace_back( std::tuple( - stream->Name(), stream->QueueSize(), stream->NumPacketsAdded(), + DebugStreamName(id), stream->QueueSize(), stream->NumPacketsAdded(), stream->MinTimestampOrBound(nullptr))); } return monitoring_info_vector; @@ -427,6 +466,10 @@ void SyncSet::FillInputSet(Timestamp input_timestamp, InputStreamShardSet* input_set) { ABSL_CHECK(input_timestamp.IsAllowedInStream()); ABSL_CHECK(input_set); + std::optional logger; + if (VLOG_IS_ON(1)) { + logger.emplace(input_stream_handler_->GetNodeName(), input_timestamp); + } std::vector streams_with_missing_packets; for (CollectionItemId id : stream_ids_) { const auto& stream = input_stream_handler_->input_stream_managers_.Get(id); @@ -434,9 +477,9 @@ void SyncSet::FillInputSet(Timestamp input_timestamp, bool stream_is_done = false; Packet current_packet = stream->PopPacketAtTimestamp( input_timestamp, &num_packets_dropped, &stream_is_done); - if (current_packet.IsEmpty()) { + if (current_packet.IsEmpty() && logger.has_value()) { // Track the streams that have no packets at the current timestamp. - streams_with_missing_packets.push_back( + logger->AddMissingPacketStreamName( input_stream_handler_->DebugStreamName(id)); } ABSL_CHECK_EQ(num_packets_dropped, 0) @@ -445,17 +488,6 @@ void SyncSet::FillInputSet(Timestamp input_timestamp, input_stream_handler_->AddPacketToShard( &input_set->Get(id), std::move(current_packet), stream_is_done); } - - const std::string node_name = input_stream_handler_->GetNodeName(); - if (!streams_with_missing_packets.empty()) { - VLOG(1) << absl::StrCat( - node_name, ": Filled input set at ts: ", input_timestamp.DebugString(), - " with MISSING packets in input streams: ", - absl::StrJoin(streams_with_missing_packets, ", "), "."); - } else { - VLOG(1) << absl::StrCat( - node_name, ": Filled input set at ts: ", input_timestamp.DebugString()); - } } void SyncSet::FillInputBounds(InputStreamShardSet* input_set) { diff --git a/mediapipe/framework/input_stream_handler.h b/mediapipe/framework/input_stream_handler.h index 2e707af672..cbf9e04db4 100644 --- a/mediapipe/framework/input_stream_handler.h +++ b/mediapipe/framework/input_stream_handler.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -92,8 +93,8 @@ class InputStreamHandler { // Sets up the InputStreamShardSet by propagating data from the managers. absl::Status SetupInputShards(InputStreamShardSet* input_shards); - // Returns a vector of pairs of stream name and queue size for monitoring - // purpose. + // Returns a vector of tuples of stream name, queue size, number of packets + // added, and minimum timestamp or bound for monitoring purpose. std::vector> GetMonitoringInfo(); // Resets the input stream handler and its underlying input streams for diff --git a/mediapipe/framework/tool/BUILD b/mediapipe/framework/tool/BUILD index 76d00abc60..24cd3cf558 100644 --- a/mediapipe/framework/tool/BUILD +++ b/mediapipe/framework/tool/BUILD @@ -978,6 +978,21 @@ cc_test( ], ) +cc_library( + name = "graph_runtime_info_utils", + srcs = ["graph_runtime_info_utils.cc"], + hdrs = ["graph_runtime_info_utils.h"], + visibility = ["//visibility:public"], + deps = [ + "//mediapipe/framework:graph_runtime_info_cc_proto", + "//mediapipe/framework:timestamp", + "@com_google_absl//absl/status:statusor", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/strings:str_format", + "@com_google_absl//absl/time", + ], +) + exports_files( ["build_defs.bzl"], visibility = [ diff --git a/mediapipe/framework/tool/graph_runtime_info_utils.cc b/mediapipe/framework/tool/graph_runtime_info_utils.cc new file mode 100644 index 0000000000..86f28d3183 --- /dev/null +++ b/mediapipe/framework/tool/graph_runtime_info_utils.cc @@ -0,0 +1,86 @@ +#include "mediapipe/framework/tool/graph_runtime_info_utils.h" + +#include +#include + +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" +#include "absl/strings/str_join.h" +#include "absl/time/time.h" +#include "mediapipe/framework/timestamp.h" + +namespace mediapipe::tool { + +absl::StatusOr GetGraphRuntimeInfoString( + const GraphRuntimeInfo& graph_runtime_info) { + const absl::Time caputure_time = + absl::FromUnixMicros(graph_runtime_info.capture_time_unix_us()); + std::string calculators_runtime_info_str; + std::vector calculators_with_unprocessed_packets; + std::vector running_calculators; + int num_packets_in_input_queues = 0; + for (const auto& calculator_info : graph_runtime_info.calculator_infos()) { + const bool is_idle = calculator_info.last_process_finish_unix_us() >= + calculator_info.last_process_start_unix_us(); + const std::string calculator_state_str = + is_idle ? absl::StrFormat( + "idle for %.2fs", + absl::ToDoubleSeconds( + caputure_time - + absl::FromUnixMicros( + calculator_info.last_process_finish_unix_us()))) + : absl::StrFormat( + "running for %.2fs", + absl::ToDoubleSeconds( + caputure_time - + absl::FromUnixMicros( + calculator_info.last_process_start_unix_us()))); + if (!is_idle) { + running_calculators.push_back(calculator_info.calculator_name()); + } + absl::StrAppend( + &calculators_runtime_info_str, + absl::StrFormat( + "\n%s: (%s, ts bound : %s)", calculator_info.calculator_name(), + calculator_state_str, + Timestamp::CreateNoErrorChecking(calculator_info.timestamp_bound()) + .DebugString())); + bool calculator_has_unprocessed_packets = false; + for (const auto& input_stream_info : calculator_info.input_stream_infos()) { + num_packets_in_input_queues += input_stream_info.queue_size(); + calculator_has_unprocessed_packets |= input_stream_info.queue_size() > 0; + absl::StrAppend( + &calculators_runtime_info_str, " * ", input_stream_info.stream_name(), + " - queue size: ", input_stream_info.queue_size(), + ", total added: ", input_stream_info.number_of_packets_added(), + ", min ts: ", + Timestamp::CreateNoErrorChecking( + input_stream_info.minimum_timestamp_or_bound()) + .DebugString(), + "\n"); + } + if (calculator_has_unprocessed_packets) { + calculators_with_unprocessed_packets.push_back( + calculator_info.calculator_name()); + } + } + const std::string calulators_with_unprocessed_packets_str = + num_packets_in_input_queues > 0 + ? absl::StrCat( + " (in calculators: ", + absl::StrJoin(calculators_with_unprocessed_packets, ", "), ")") + : ""; + const std::string running_calculators_str = + running_calculators.empty() + ? "None" + : absl::StrCat(" (running calculators: ", + absl::StrJoin(running_calculators, ", "), ")"); + return absl::StrFormat( + "Graph runtime info: \nRunning calculators: %s\nNum packets in input " + "queues: %d%s\n%s\n", + running_calculators_str, num_packets_in_input_queues, + calulators_with_unprocessed_packets_str, calculators_runtime_info_str); +} + +} // namespace mediapipe::tool diff --git a/mediapipe/framework/tool/graph_runtime_info_utils.h b/mediapipe/framework/tool/graph_runtime_info_utils.h new file mode 100644 index 0000000000..0418ffa152 --- /dev/null +++ b/mediapipe/framework/tool/graph_runtime_info_utils.h @@ -0,0 +1,31 @@ +// Copyright 2024 The MediaPipe Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef MEDIAPIPE_FRAMEWORK_TOOL_GRAPH_RUNTIME_INFO_UTILS_H_ +#define MEDIAPIPE_FRAMEWORK_TOOL_GRAPH_RUNTIME_INFO_UTILS_H_ + +#include + +#include "absl/status/statusor.h" +#include "mediapipe/framework/graph_runtime_info.pb.h" + +namespace mediapipe::tool { + +// Returns a human readable representation of the graph runtime info. +absl::StatusOr GetGraphRuntimeInfoString( + const GraphRuntimeInfo& graph_runtime_info); + +} // namespace mediapipe::tool + +#endif // MEDIAPIPE_FRAMEWORK_TOOL_GRAPH_RUNTIME_INFO_UTILS_H_