Skip to content

Commit

Permalink
Adds GetGraphRuntimeInfo methods which generates runtime debugging in…
Browse files Browse the repository at this point in the history
…formation about the state of InputStreams.

PiperOrigin-RevId: 698330776
  • Loading branch information
MediaPipe Team authored and copybara-github committed Nov 20, 2024
1 parent 7a70aac commit 17fbbeb
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 28 deletions.
16 changes: 15 additions & 1 deletion mediapipe/framework/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ cc_library(
":delegating_executor",
":executor",
":graph_output_stream",
":graph_runtime_info_cc_proto",
":graph_service",
":graph_service_manager",
":input_stream_manager",
Expand All @@ -359,8 +360,8 @@ cc_library(
":timestamp",
":validated_graph_config",
":vlog_overrides",
"//mediapipe/framework/deps:clock",
"//mediapipe/framework/port:core_proto",
"//mediapipe/framework/port:integral_types",
"//mediapipe/framework/port:logging",
"//mediapipe/framework/port:map_util",
"//mediapipe/framework/port:ret_check",
Expand All @@ -387,6 +388,7 @@ cc_library(
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
],
)

Expand Down Expand Up @@ -425,6 +427,7 @@ cc_library(
":calculator_context_manager",
":calculator_state",
":counter_factory",
":graph_runtime_info_cc_proto",
":graph_service_manager",
":input_side_packet_handler",
":input_stream_handler",
Expand All @@ -441,6 +444,7 @@ cc_library(
":stream_handler_cc_proto",
":timestamp",
":validated_graph_config",
"//mediapipe/framework/deps:clock",
"//mediapipe/framework/port:core_proto",
"//mediapipe/framework/port:integral_types",
"//mediapipe/framework/port:logging",
Expand All @@ -454,12 +458,14 @@ cc_library(
"//mediapipe/framework/tool:tag_map",
"//mediapipe/framework/tool:validate_name",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/cleanup",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
],
)

Expand Down Expand Up @@ -647,6 +653,12 @@ cc_library(
],
)

mediapipe_proto_library(
name = "graph_runtime_info_proto",
srcs = ["graph_runtime_info.proto"],
visibility = ["//visibility:public"],
)

cc_library(
name = "graph_service",
hdrs = ["graph_service.h"],
Expand Down Expand Up @@ -785,6 +797,7 @@ cc_library(
"//mediapipe/framework/port:ret_check",
"//mediapipe/framework/port:status",
"//mediapipe/framework/tool:tag_map",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/strings",
Expand Down Expand Up @@ -1620,6 +1633,7 @@ cc_test(
"//mediapipe/calculators/core:counting_source_calculator",
"//mediapipe/calculators/core:mux_calculator",
"//mediapipe/calculators/core:pass_through_calculator",
"//mediapipe/framework/deps:clock",
"//mediapipe/framework/port:gtest_main",
"//mediapipe/framework/port:parse_text_proto",
"//mediapipe/framework/port:ret_check",
Expand Down
13 changes: 13 additions & 0 deletions mediapipe/framework/calculator_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_base.h"
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/delegating_executor.h"
#include "mediapipe/framework/deps/clock.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/graph_output_stream.h"
#include "mediapipe/framework/graph_service_manager.h"
Expand Down Expand Up @@ -914,6 +916,17 @@ absl::Status CalculatorGraph::WaitForObservedOutput() {
return scheduler_.WaitForObservedOutput();
}

absl::StatusOr<GraphRuntimeInfo> CalculatorGraph::GetGraphRuntimeInfo() {
RET_CHECK(initialized_);
GraphRuntimeInfo info;
for (const auto& node : nodes_) {
*info.add_calculator_infos() = node->GetStreamMonitoringInfo();
}
const absl::Time time_now = mediapipe::Clock::RealClock()->TimeNow();
info.set_capture_time_unix_us(absl::ToUnixMicros(time_now));
return info;
}

absl::Status CalculatorGraph::AddPacketToInputStream(
absl::string_view stream_name, const Packet& packet) {
return AddPacketToInputStreamInternal(stream_name, packet);
Expand Down
6 changes: 6 additions & 0 deletions mediapipe/framework/calculator_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/graph_output_stream.h"
#include "mediapipe/framework/graph_runtime_info.pb.h"
#include "mediapipe/framework/graph_service.h"
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/mediapipe_profiling.h"
Expand Down Expand Up @@ -257,6 +258,11 @@ class CalculatorGraph {
// Quick non-locking means of checking if the graph has encountered an error.
bool HasError() const { return has_error_; }

// Returns debugging information about the graph transient state, including
// information about all input streams and their timestamp bounds. This method
// is thread safe and can be called from any thread.
absl::StatusOr<GraphRuntimeInfo> GetGraphRuntimeInfo();

// Add a Packet to a graph input stream based on the graph input stream add
// mode. If the mode is ADD_IF_NOT_FULL, the packet will not be added if any
// queue exceeds max_queue_size specified by the graph config and will return
Expand Down
7 changes: 4 additions & 3 deletions mediapipe/framework/calculator_graph_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/collection_item_id.h"
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/deps/clock.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/input_stream_handler.h"
#include "mediapipe/framework/lifetime_tracker.h"
Expand Down Expand Up @@ -70,7 +71,6 @@
#include "mediapipe/gpu/gpu_service.h"

namespace mediapipe {

namespace {

constexpr char kCounter2Tag[] = "COUNTER2";
Expand All @@ -83,8 +83,9 @@ constexpr char kOutputTag[] = "OUTPUT";
constexpr char kInputTag[] = "INPUT";
constexpr char kSelectTag[] = "SELECT";

using testing::ElementsAre;
using testing::HasSubstr;
using ::mediapipe::Clock;
using ::testing::ElementsAre;
using ::testing::HasSubstr;

// Pass packets through. Note that it calls SetOffset() in Process()
// instead of Open().
Expand Down
60 changes: 53 additions & 7 deletions mediapipe/framework/calculator_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@
#include <memory>
#include <set>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>

#include "absl/cleanup/cleanup.h"
#include "absl/log/absl_check.h"
#include "absl/log/absl_log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_base.h"
#include "mediapipe/framework/calculator_state.h"
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/deps/clock.h"
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/input_stream_manager.h"
#include "mediapipe/framework/mediapipe_profiling.h"
Expand All @@ -54,6 +60,7 @@

namespace mediapipe {

using ::mediapipe::Clock;
namespace {

const PacketType* GetPacketType(const PacketTypeSet& packet_type_set,
Expand Down Expand Up @@ -116,7 +123,11 @@ std::unique_ptr<PacketTypeSet> RemoveOmittedPacketTypes(

} // namespace

CalculatorNode::CalculatorNode() {}
CalculatorNode::CalculatorNode() {
absl::Time now = Clock::RealClock()->TimeNow();
last_process_start_ts_ = now;
last_process_finish_ts_ = now;
}

Timestamp CalculatorNode::SourceProcessOrder(
const CalculatorContext* cc) const {
Expand Down Expand Up @@ -218,6 +229,29 @@ absl::Status CalculatorNode::Initialize(
return InitializeInputStreams(input_stream_managers, output_stream_managers);
}

CalculatorRuntimeInfo CalculatorNode::GetStreamMonitoringInfo() const {
CalculatorRuntimeInfo calulator_info;
calulator_info.set_calculator_name(DebugName());
{
absl::MutexLock lock(&runtime_info_mutex_);
calulator_info.set_last_process_start_unix_us(
absl::ToUnixMicros(last_process_start_ts_));
calulator_info.set_last_process_finish_unix_us(
absl::ToUnixMicros(last_process_finish_ts_));
}
const auto monitoring_info = input_stream_handler_->GetMonitoringInfo();
for (const auto& [stream_name, queue_size, num_packets_added,
minimum_timestamp_or_bound] : monitoring_info) {
auto* stream_info = calulator_info.add_input_stream_infos();
stream_info->set_stream_name(stream_name);
stream_info->set_queue_size(queue_size);
stream_info->set_number_of_packets_added(num_packets_added);
stream_info->set_minimum_timestamp_or_bound(
minimum_timestamp_or_bound.Value());
}
return calulator_info;
}

absl::Status CalculatorNode::InitializeOutputSidePackets(
const PacketTypeSet& output_side_packet_types,
OutputSidePacketImpl* output_side_packets) {
Expand Down Expand Up @@ -669,14 +703,14 @@ void CalculatorNode::SchedulingLoop() {
max_allowance = max_in_flight_ - current_in_flight_;
}
while (true) {
Timestamp input_bound;
// input_bound is set to a meaningful value iff the latest readiness of the
// node is kNotReady when ScheduleInvocations() returns.
input_stream_handler_->ScheduleInvocations(max_allowance, &input_bound);
if (input_bound != Timestamp::Unset()) {
// last_timestamp_bound_ is set to a meaningful value iff the latest
// readiness of the node is kNotReady when ScheduleInvocations() returns.
input_stream_handler_->ScheduleInvocations(max_allowance,
&last_timestamp_bound_);
if (last_timestamp_bound_ != Timestamp::Unset()) {
// Updates the minimum timestamp for which a new packet could possibly
// arrive.
output_stream_handler_->UpdateTaskTimestampBound(input_bound);
output_stream_handler_->UpdateTaskTimestampBound(last_timestamp_bound_);
}

{
Expand Down Expand Up @@ -805,6 +839,18 @@ std::string CalculatorNode::DebugName() const {
// TODO: Split this function.
absl::Status CalculatorNode::ProcessNode(
CalculatorContext* calculator_context) {
// Update calculator runtime info.
{
absl::MutexLock lock(&runtime_info_mutex_);
last_process_start_ts_ = Clock::RealClock()->TimeNow();
}
absl::Cleanup last_process_finish_ts_cleanup([this]() {
{
absl::MutexLock lock(&runtime_info_mutex_);
last_process_finish_ts_ = Clock::RealClock()->TimeNow();
}
});

if (IsSource()) {
// This is a source Calculator.
if (Closed()) {
Expand Down
21 changes: 21 additions & 0 deletions mediapipe/framework/calculator_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
#include <unordered_set>

#include "absl/base/macros.h"
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_base.h"
#include "mediapipe/framework/calculator_context.h"
#include "mediapipe/framework/calculator_context_manager.h"
#include "mediapipe/framework/calculator_state.h"
#include "mediapipe/framework/graph_runtime_info.pb.h"
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/input_side_packet_handler.h"
#include "mediapipe/framework/input_stream_handler.h"
Expand Down Expand Up @@ -239,6 +242,14 @@ class CalculatorNode {
return node_type_info_->Contract();
}

// Returns the last timestamp bound used to schedule this node.
Timestamp GetLastTimestampBound() const { return last_timestamp_bound_; }

// Returns the stream monitoring info for this node consisting of a vector of
// tuples of input stream name, queue size, number of packets added, and
// minimum timestamp or bound.
CalculatorRuntimeInfo GetStreamMonitoringInfo() const;

private:
// Sets up the output side packets from the main flat array.
absl::Status InitializeOutputSidePackets(
Expand Down Expand Up @@ -376,6 +387,16 @@ class CalculatorNode {
const ValidatedGraphConfig* validated_graph_ = nullptr;

const NodeTypeInfo* node_type_info_ = nullptr;

// Keeps track of the latest timestamp bound used to schedule this node.
Timestamp last_timestamp_bound_ = Timestamp::Unset();

// Keeps track of the runtime info for this node.
mutable absl::Mutex runtime_info_mutex_;
absl::Time last_process_start_ts_ ABSL_GUARDED_BY(runtime_info_mutex_) =
absl::InfinitePast();
absl::Time last_process_finish_ts_ ABSL_GUARDED_BY(runtime_info_mutex_) =
absl::InfinitePast();
};

} // namespace mediapipe
Expand Down
62 changes: 62 additions & 0 deletions mediapipe/framework/graph_runtime_info.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package mediapipe;

option java_package = "com.google.mediapipe.proto";
option java_outer_classname = "GraphRuntimeInfoProto";

// The runtime info for an input stream.
message StreamRuntimeInfo {
// The name of the stream in the format "TAG:index:stream_name"
string stream_name = 1;

// The number of packets in the queue.
int32 queue_size = 2;

// The total number of packets added to the queue.
int32 number_of_packets_added = 3;

// The minimum timestamp or timestanp bound of the stream.
int64 minimum_timestamp_or_bound = 4;
}

// The runtime info for a calculator.
message CalculatorRuntimeInfo {
// The name of the calculator.
string calculator_name = 1;

// The last time when the Calculator::Process was started.
int64 last_process_start_unix_us = 2;

// The last time when the Calculator::Process was finished.
int64 last_process_finish_unix_us = 3;

// The timestamp bound of the calculator.
int64 timestamp_bound = 4;

// The runtime info for each input stream of the calculator.
repeated StreamRuntimeInfo input_stream_infos = 5;
}

// The runtime info for the whole graph.
message GraphRuntimeInfo {
// The time when the runtime info was captured.
int64 capture_time_unix_us = 1;

// The runtime info for each calculator in the graph.
repeated CalculatorRuntimeInfo calculator_infos = 2;
}
Loading

0 comments on commit 17fbbeb

Please sign in to comment.