Skip to content

Commit

Permalink
No public description
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 704763055
  • Loading branch information
MediaPipe Team authored and copybara-github committed Dec 11, 2024
1 parent 43dea0a commit 45d2f95
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 37 deletions.
67 changes: 60 additions & 7 deletions docs/getting_started/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ nav_order: 10
[https://developers.google.com/mediapipe](https://developers.google.com/mediapipe)
as the primary developer documentation site for MediaPipe as of April 3, 2023.*

----
--------------------------------------------------------------------------------

## Missing Python binary path

Expand Down Expand Up @@ -113,9 +113,10 @@ ERROR: Could not find a version that satisfies the requirement mediapipe
ERROR: No matching distribution found for mediapipe
```

after running `pip install mediapipe` usually indicates that there is no qualified MediaPipe Python for your system.
Please note that MediaPipe Python PyPI officially supports the **64-bit**
version of Python 3.7 to 3.10 on the following OS:
after running `pip install mediapipe` usually indicates that there is no
qualified MediaPipe Python for your system. Please note that MediaPipe Python
PyPI officially supports the **64-bit** version of Python 3.7 to 3.10 on the
following OS:

- x86_64 Linux
- x86_64 macOS 10.15+
Expand Down Expand Up @@ -270,6 +271,58 @@ calculators designed specifically for this purpose such as
[`FlowLimiterCalculator`] as described in
[`How to process realtime input streams`].

## Monitor calculator inputs and timestamp settlements

Debugging MediaPipe calculators often requires a deep understanding of the data
flow and timestamp synchronization. Incoming packets to calculators are first
buffered in input queues per stream to be synchronized by the assigned
`InputStreamHandler`. The `InputStreamHandler` job is to determine the input
packet set for a settled timestamp, which puts the calculator into a “ready”
state, followed by triggering a Calculator::Process call with the determined
packet set as input.

The `DebugInputStreamHandler` can be used to track incoming packets and
timestamp settlements in real-time in the application's LOG(INFO) output. It can
be assigned to specific calculators via the Calculator's input_stream_handler or
graph globally via the `CalculatorGraphConfig`'s input_stream_handler field.

During the graph execution, incoming packets generate LOG messages which reveal
the timestamp and type of the packet, followed by the current state of all input
queues:

```
[INFO] SomeCalculator: Adding packet (ts:2, type:int) to stream INPUT_B:0:input_b
[INFO] SomeCalculator: INPUT_A:0:input_a num_packets: 0 min_ts: 2
[INFO] SomeCalculator: INPUT_B:0:input_b num_packets: 1 min_ts: 2
```

In addition, it enables the monitoring of timestamp settlement events (in case
the `DefaultInputStreamHandler` is applied). This can help to reveal an
unexpected timestamp bound increase on input streams resulting in a
Calculator::Process call with an incomplete input set resulting in empty packets
on (potentially required) input streams.

*Example scenario:*

```
node {
calculator: "SomeCalculator"
input_stream: "INPUT_A:a"
input_stream: "INPUT_B:b"
...
}
```

Given a calculator with two inputs, receiving an incoming packet with timestamp
1 on stream A followed by an input packet with timestamp 2 on stream B. The
timestamp bound increase to 2 on stream B with pending input packet on stream A
at timestamp 1 triggers the Calculator::Process call with an incomplete input
set for timestamp 1. In this case, the `DefaultInputStreamHandler` outputs:

```
[INFO] SomeCalculator: Filled input set at ts: 1 with MISSING packets in input streams: INPUT_B:0:input_b.
```

## VLOG is your friend

MediaPipe uses `VLOG` in many places to log important events for debugging
Expand All @@ -287,8 +340,9 @@ purposes which are applied when `CalculatorGraph` is created.

Overrides:

- `MEDIAPIPE_VLOG_V`: define and provide value you provide for `--v`
- `MEDIAPIPE_VLOG_VMODULE`: define and provide value you provide for `--vmodule`
- `MEDIAPIPE_VLOG_V`: define and provide value you provide for `--v`
- `MEDIAPIPE_VLOG_VMODULE`: define and provide value you provide for
`--vmodule`

You can set overrides by adding:
`--copt=-DMEDIAPIPE_VLOG_VMODULE=\"*calculator*=5\"`
Expand Down Expand Up @@ -326,4 +380,3 @@ To disable support for `avxvnniint8`, add the following to you `.bazelrc`:
```
build --define=xnn_enable_avxvnniint8=false
```

4 changes: 2 additions & 2 deletions mediapipe/framework/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1189,12 +1189,12 @@ cc_library(
":calculator_node",
":executor",
"//mediapipe/framework/deps:clock",
"//mediapipe/framework/port:integral_types",
"//mediapipe/framework/port:logging",
"//mediapipe/framework/port:ret_check",
"//mediapipe/framework/port:status",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/strings:string_view",
"@com_google_absl//absl/synchronization",
],
)
Expand Down
8 changes: 6 additions & 2 deletions mediapipe/framework/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

#include "mediapipe/framework/scheduler.h"

#include <functional>
#include <memory>
#include <queue>
#include <utility>
#include <vector>

#include "absl/log/absl_check.h"
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/executor.h"
Expand All @@ -36,8 +38,10 @@ namespace mediapipe {

namespace internal {

inline constexpr absl::string_view kDefaultQueueName = "default_queue";

Scheduler::Scheduler(CalculatorGraph* graph)
: graph_(graph), shared_(), default_queue_(&shared_) {
: graph_(graph), shared_(), default_queue_(kDefaultQueueName, &shared_) {
shared_.error_callback =
std::bind(&CalculatorGraph::RecordError, graph_, std::placeholders::_1);
default_queue_.SetIdleCallback(std::bind(&Scheduler::QueueIdleStateChanged,
Expand Down Expand Up @@ -90,7 +94,7 @@ absl::Status Scheduler::SetNonDefaultExecutor(const std::string& name,
"be called after the scheduler "
"has started";
auto inserted = non_default_queues_.emplace(
name, absl::make_unique<SchedulerQueue>(&shared_));
name, absl::make_unique<SchedulerQueue>(name, &shared_));
RET_CHECK(inserted.second)
<< "SetNonDefaultExecutor must be called only once for the executor \""
<< name << "\"";
Expand Down
32 changes: 19 additions & 13 deletions mediapipe/framework/scheduler_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@

#include "mediapipe/framework/scheduler_queue.h"

#include <memory>
#include <cstdint>
#include <queue>
#include <utility>

#include "absl/log/absl_check.h"
#include "absl/log/absl_log.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator_node.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/port/canonical_errors.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/status.h"

#ifdef __APPLE__
#define AUTORELEASEPOOL @autoreleasepool
Expand Down Expand Up @@ -97,7 +95,7 @@ void SchedulerQueue::Reset() {
void SchedulerQueue::SetExecutor(Executor* executor) { executor_ = executor; }

bool SchedulerQueue::IsIdle() {
VLOG(3) << "Scheduler queue empty: " << queue_.empty()
VLOG(3) << "Scheduler queue (" << queue_name_ << ") empty: " << queue_.empty()
<< ", # of pending tasks: " << num_pending_tasks_;
return queue_.empty() && num_pending_tasks_ == 0;
}
Expand Down Expand Up @@ -140,7 +138,8 @@ void SchedulerQueue::AddItemToQueue(Item&& item) {
was_idle = IsIdle();
queue_.push(item);
++num_tasks_to_add_;
VLOG(4) << node->DebugName() << " was added to the scheduler queue.";
VLOG(4) << node->DebugName() << " was added to the scheduler queue ("
<< queue_name_ << ")";

// Now grab the tasks to execute while still holding the lock. This will
// gather any waiting tasks, in addition to the one we just added.
Expand Down Expand Up @@ -235,13 +234,15 @@ void SchedulerQueue::RunNextTask() {

void SchedulerQueue::RunCalculatorNode(CalculatorNode* node,
CalculatorContext* cc) {
VLOG(3) << "Running " << node->DebugName();
VLOG(3) << "Running " << node->DebugName() << " on queue (" << queue_name_
<< ")";

// If we are in the process of stopping the graph (due to tool::StatusStop()
// from a non-source node or due to CalculatorGraph::CloseAllPacketSources),
// we should not run any more sources. Close the node if it is a source.
if (shared_->stopping && node->IsSource()) {
VLOG(4) << "Closing " << node->DebugName() << " due to StatusStop().";
VLOG(4) << "Closing " << node->DebugName()
<< " due to StatusStop() on queue (" << queue_name_ << ").";
int64_t start_time = shared_->timer.StartNode();
// It's OK to not reset/release the prepared CalculatorContext since a
// source node always reuses the same CalculatorContext and Close() doesn't
Expand All @@ -252,7 +253,8 @@ void SchedulerQueue::RunCalculatorNode(CalculatorNode* node,
shared_->timer.EndNode(start_time);
if (!result.ok()) {
VLOG(3) << node->DebugName()
<< " had an error while closing due to StatusStop()!";
<< " had an error while closing due to StatusStop()! on queue ("
<< queue_name_ << ")";
shared_->error_callback(result);
}
} else {
Expand All @@ -273,23 +275,27 @@ void SchedulerQueue::RunCalculatorNode(CalculatorNode* node,
shared_->stopping = true;
} else {
// If we have an error in this calculator.
VLOG(3) << node->DebugName() << " had an error!";
VLOG(3) << node->DebugName() << " had an error on queue ("
<< queue_name_ << ")!";
shared_->error_callback(result);
}
}
}

VLOG(4) << "Done running " << node->DebugName();
VLOG(4) << "Done running " << node->DebugName() << " on queue ("
<< queue_name_ << ")";
node->EndScheduling();
}

void SchedulerQueue::OpenCalculatorNode(CalculatorNode* node) {
VLOG(3) << "Opening " << node->DebugName();
VLOG(3) << "Opening " << node->DebugName() << " on queue (" << queue_name_
<< ")";
int64_t start_time = shared_->timer.StartNode();
const absl::Status result = node->OpenNode();
shared_->timer.EndNode(start_time);
if (!result.ok()) {
VLOG(3) << node->DebugName() << " had an error!";
VLOG(3) << node->DebugName() << " had an error on queue (" << queue_name_
<< ")!";
shared_->error_callback(result);
return;
}
Expand Down
12 changes: 8 additions & 4 deletions mediapipe/framework/scheduler_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
#ifndef MEDIAPIPE_FRAMEWORK_SCHEDULER_QUEUE_H_
#define MEDIAPIPE_FRAMEWORK_SCHEDULER_QUEUE_H_

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <queue>
#include <string>
#include <utility>

#include "absl/base/macros.h"
#include "absl/base/thread_annotations.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator_context.h"
#include "mediapipe/framework/executor.h"
Expand Down Expand Up @@ -76,7 +76,8 @@ class SchedulerQueue : public TaskQueue {
bool is_open_node_ = false; // True if the task should run OpenNode().
};

explicit SchedulerQueue(SchedulerShared* shared) : shared_(shared) {}
explicit SchedulerQueue(absl::string_view queue_name, SchedulerShared* shared)
: queue_name_(queue_name), shared_(shared) {}

// Sets the executor that will run the nodes. Must be called before the
// scheduler is started.
Expand Down Expand Up @@ -145,6 +146,9 @@ class SchedulerQueue : public TaskQueue {
// Checks whether the queue has no queued nodes or pending tasks.
bool IsIdle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// Queue name for logging purposes.
const std::string queue_name_;

Executor* executor_ = nullptr;

IdleCallback idle_callback_;
Expand Down
2 changes: 1 addition & 1 deletion mediapipe/framework/tool/graph_runtime_info_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class GraphRuntimeInfoLogger {

absl::Notification shutdown_signal_;
absl::Notification is_running_;
ThreadPool thread_pool_;
absl::AnyInvocable<absl::StatusOr<GraphRuntimeInfo>()> get_runtime_info_fn_;
ThreadPool thread_pool_;
};

} // namespace mediapipe::tool
Expand Down
21 changes: 13 additions & 8 deletions third_party/opencv_macos.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,31 @@ exports_files(["LICENSE"])

# Example configurations:
#
# To configure OpenCV 3, obtain the path of OpenCV 3 from Homebrew:
# # OpenCV 3
# To configure OpenCV 3, obtain the path of OpenCV 3 from Homebrew. The
# following commands show the output of the command with version 3.4.16_10:
#
# $ brew ls opencv@3 | grep version.hpp
# $ /opt/homebrew/Cellar/opencv@3/3.4.16_10/include/opencv2/core/version.hpp
#
# Then set path in "macos_opencv" rule in the WORKSPACE file to
# "/opt/homebrew/Cellar" and the PREFIX below to "opencv@3/3.4.16_10".
# "/opt/homebrew/Cellar" and the PREFIX below to "opencv/<version>" (e.g.
# "opencv/3.4.16_10" for the example above).
#
#
# To configure OpenCV 4, obtain the path of OpenCV 4 from Homebrew:
# # OpenCV 4
# To configure OpenCV 4, obtain the path of OpenCV 4 from Homebrew. The
# following commands show the output of the command with version 4.10.0_12:
#
# $ brew ls opencv | grep version.hpp
# $ /opt/homebrew/Cellar/opencv/4.10.0_12/include/opencv4/opencv2/core/version.hpp
# $ /opt/homebrew/Cellar/opencv/4.10.0_12/include/opencv4/opencv2/dnn/version.hpp
#
# Then set path in "macos_opencv" rule in the WORKSPACE file to
# "/opt/homebrew/Cellar" and the PREFIX below to "opencv/4.10.0_12". For OpenCV
# 4, you will also need to adjust the include paths. The header search path
# should be "include/opencv4/opencv2/**/*.h*" and the include prefix needs to
# be set to "include/opencv4".
# "/opt/homebrew/Cellar" and the PREFIX below to "opencv/<version>" (e.g.
# "opencv/4.10.0_12" for the example above). For OpenCV 4, you will also need to
# adjust the include paths. The header search path should be
# "include/opencv4/opencv2/**/*.h*" and the include prefix needs to be set to
# "include/opencv4".

PREFIX = "opt/opencv@3"

Expand Down

0 comments on commit 45d2f95

Please sign in to comment.