Skip to content

Commit

Permalink
feat: Specifiy target tracing driver IDs (#11560)
Browse files Browse the repository at this point in the history
Summary:
Add a `driver_ids` flag to pass a comma-separated list of driver IDs string to
specify the drivers to replay. Extracts the number of drivers by listing
the number of sub-directors under the trace directory if `driver_ids` is empty.

Parts of #9668

Pull Request resolved: #11560

Reviewed By: oerling

Differential Revision: D66438878

Pulled By: xiaoxmeng

fbshipit-source-id: be5db2607578965b138aaf7b317f37589999d597
  • Loading branch information
duanmeng authored and facebook-github-bot committed Nov 25, 2024
1 parent 2b5e9f1 commit 22392b1
Show file tree
Hide file tree
Showing 27 changed files with 267 additions and 75 deletions.
7 changes: 7 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,12 @@ class TraceScanNode final : public PlanNode {
const PlanNodeId& id,
const std::string& traceDir,
uint32_t pipelineId,
std::vector<uint32_t> driverIds,
const RowTypePtr& outputType)
: PlanNode(id),
traceDir_(traceDir),
pipelineId_(pipelineId),
driverIds_(std::move(driverIds)),
outputType_(outputType) {}

const RowTypePtr& outputType() const override {
Expand All @@ -348,12 +350,17 @@ class TraceScanNode final : public PlanNode {
return pipelineId_;
}

std::vector<uint32_t> driverIds() const {
return driverIds_;
}

private:
void addDetails(std::stringstream& stream) const override;

// Directory of traced data, which is $traceRoot/$taskId/$nodeId.
const std::string traceDir_;
const uint32_t pipelineId_;
const std::vector<uint32_t> driverIds_;
const RowTypePtr outputType_;
};

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/OperatorTraceScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ OperatorTraceScan::OperatorTraceScan(
getOpTraceDirectory(
traceScanNode->traceDir(),
traceScanNode->pipelineId(),
driverCtx->driverId),
traceScanNode->driverIds().at(driverCtx->driverId)),
traceScanNode->outputType(),
memory::MemoryManager::getInstance()->tracePool());
}
Expand Down
15 changes: 9 additions & 6 deletions velox/exec/TraceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <folly/json.h>

#include <numeric>
#include "velox/common/base/Exceptions.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"
Expand Down Expand Up @@ -100,7 +101,7 @@ std::string getOpTraceDirectory(
std::string getOpTraceDirectory(
const std::string& nodeTraceDir,
uint32_t pipelineId,
int driverId) {
uint32_t driverId) {
return fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, driverId);
}

Expand Down Expand Up @@ -232,11 +233,13 @@ std::vector<uint32_t> listDriverIds(
return driverIds;
}

size_t getNumDrivers(
const std::string& nodeTraceDir,
uint32_t pipelineId,
const std::shared_ptr<filesystems::FileSystem>& fs) {
return listDriverIds(nodeTraceDir, pipelineId, fs).size();
std::vector<uint32_t> extractDriverIds(const std::string& driverIds) {
std::vector<uint32_t> driverIdList;
if (driverIds.empty()) {
return driverIdList;
}
folly::split(",", driverIds, driverIdList);
return driverIdList;
}

bool canTrace(const std::string& operatorType) {
Expand Down
11 changes: 3 additions & 8 deletions velox/exec/TraceUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ std::string getOpTraceDirectory(
std::string getOpTraceDirectory(
const std::string& nodeTraceDir,
uint32_t pipelineId,
int driverId);
uint32_t driverId);

/// Returns the file path for a given operator's traced input file.
std::string getOpTraceInputFilePath(const std::string& opTraceDir);
Expand Down Expand Up @@ -122,13 +122,8 @@ std::vector<uint32_t> listDriverIds(
uint32_t pipelineId,
const std::shared_ptr<filesystems::FileSystem>& fs);

/// Extracts the number of drivers by listing the number of sub-directors under
/// the trace directory for a given pipeline. 'nodeTraceDir' is the trace
/// directory of the plan node.
size_t getNumDrivers(
const std::string& nodeTraceDir,
uint32_t pipelineId,
const std::shared_ptr<filesystems::FileSystem>& fs);
/// Extracts the driver IDs from the comma-separated list of driver IDs string.
std::vector<uint32_t> extractDriverIds(const std::string& driverIds);

/// Extracts task ids of the query tracing by listing the query trace directory.
/// 'traceDir' is the root trace directory. 'queryId' is the query id.
Expand Down
6 changes: 3 additions & 3 deletions velox/exec/tests/TraceUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ TEST_F(TraceUtilTest, getDriverIds) {
fs->mkdir(nodeTraceDir);
const uint32_t pipelineId = 1;
fs->mkdir(trace::getPipelineTraceDirectory(nodeTraceDir, pipelineId));
ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 0);
ASSERT_TRUE(listDriverIds(nodeTraceDir, pipelineId, fs).empty());
// create 3 drivers.
const uint32_t driverId1 = 1;
Expand All @@ -205,7 +204,6 @@ TEST_F(TraceUtilTest, getDriverIds) {
fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId2));
const uint32_t driverId3 = 3;
fs->mkdir(trace::getOpTraceDirectory(nodeTraceDir, pipelineId, driverId3));
ASSERT_EQ(getNumDrivers(nodeTraceDir, pipelineId, fs), 3);
auto driverIds = listDriverIds(nodeTraceDir, pipelineId, fs);
ASSERT_EQ(driverIds.size(), 3);
std::sort(driverIds.begin(), driverIds.end());
Expand All @@ -215,7 +213,9 @@ TEST_F(TraceUtilTest, getDriverIds) {
// Bad driver id.
const std::string BadDriverId = "badDriverId";
fs->mkdir(fmt::format("{}/{}/{}", nodeTraceDir, pipelineId, BadDriverId));
ASSERT_ANY_THROW(getNumDrivers(nodeTraceDir, pipelineId, fs));
ASSERT_ANY_THROW(listDriverIds(nodeTraceDir, pipelineId, fs));
ASSERT_EQ(std::vector<uint32_t>({1, 2, 4}), extractDriverIds("1,2,4"));
ASSERT_TRUE(extractDriverIds("").empty());
ASSERT_NE(std::vector<uint32_t>({1, 2}), extractDriverIds("1,2,4"));
}
} // namespace facebook::velox::exec::trace::test
8 changes: 6 additions & 2 deletions velox/exec/tests/utils/HiveConnectorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ void HiveConnectorTestBase::writeToFile(
velox::dwrf::WriterOptions options;
options.config = config;
options.schema = schema;
auto localWriteFile = std::make_unique<LocalWriteFile>(filePath, true, false);
auto fs = filesystems::getFileSystem(filePath, {});
auto writeFile = fs->openFileForWrite(
filePath,
{.shouldCreateParentDirectories = true,
.shouldThrowOnFileAlreadyExists = false});
auto sink = std::make_unique<dwio::common::WriteFileSink>(
std::move(localWriteFile), filePath);
std::move(writeFile), filePath);
auto childPool = rootPool_->addAggregateChild("HiveConnectorTestBase.Writer");
options.memoryPool = childPool.get();
options.flushPolicyFactory = flushPolicyFactory;
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,14 @@ PlanBuilder& PlanBuilder::values(
PlanBuilder& PlanBuilder::traceScan(
const std::string& traceNodeDir,
uint32_t pipelineId,
std::vector<uint32_t> driverIds,
const RowTypePtr& outputType) {
planNode_ = std::make_shared<core::TraceScanNode>(
nextPlanNodeId(), traceNodeDir, pipelineId, outputType);
nextPlanNodeId(),
traceNodeDir,
pipelineId,
std::move(driverIds),
outputType);
return *this;
}

Expand Down
4 changes: 4 additions & 0 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,14 @@ class PlanBuilder {
/// @param traceNodeDir The trace directory for a given plan node.
/// @param pipelineId The pipeline id for the traced operator instantiated
/// from the given plan node.
/// @param driverIds The target driver ID list for replay. The replaying
/// operator uses its driver instance id as the list index to get the traced
/// driver id for replay.
/// @param outputType The type of the tracing data.
PlanBuilder& traceScan(
const std::string& traceNodeDir,
uint32_t pipelineId,
std::vector<uint32_t> driverIds,
const RowTypePtr& outputType);

/// Add an ExchangeNode.
Expand Down
11 changes: 9 additions & 2 deletions velox/tool/trace/AggregationReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ class AggregationReplayer : public OperatorReplayerBase {
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const std::string& operatorType)
: OperatorReplayerBase(traceDir, queryId, taskId, nodeId, operatorType) {}
const std::string& operatorType,
const std::string& driverIds)
: OperatorReplayerBase(
traceDir,
queryId,
taskId,
nodeId,
operatorType,
driverIds) {}

private:
core::PlanNodePtr createPlanNode(
Expand Down
11 changes: 9 additions & 2 deletions velox/tool/trace/FilterProjectReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ class FilterProjectReplayer : public OperatorReplayerBase {
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const std::string& operatorType)
: OperatorReplayerBase(rootDir, queryId, taskId, nodeId, operatorType) {}
const std::string& operatorType,
const std::string& driverIds)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
operatorType,
driverIds) {}

private:
// Create either a standalone FilterNode, a standalone ProjectNode, or a
Expand Down
1 change: 1 addition & 0 deletions velox/tool/trace/HashJoinReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ core::PlanNodePtr HashJoinReplayer::createPlanNode(
.traceScan(
nodeTraceDir_,
pipelineIds_.at(1), // Build side
driverIds_,
exec::trace::getDataType(planFragment_, nodeId_, 1))
.planNode(),
hashJoinNode->outputType());
Expand Down
11 changes: 9 additions & 2 deletions velox/tool/trace/HashJoinReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ class HashJoinReplayer final : public OperatorReplayerBase {
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const std::string& operatorType)
: OperatorReplayerBase(rootDir, queryId, taskId, nodeId, operatorType) {}
const std::string& operatorType,
const std::string& driverIds)
: OperatorReplayerBase(
rootDir,
queryId,
taskId,
nodeId,
operatorType,
driverIds) {}

private:
core::PlanNodePtr createPlanNode(
Expand Down
16 changes: 10 additions & 6 deletions velox/tool/trace/OperatorReplayerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ OperatorReplayerBase::OperatorReplayerBase(
std::string queryId,
std::string taskId,
std::string nodeId,
std::string operatorType)
std::string operatorType,
const std::string& driverIds)
: queryId_(std::string(std::move(queryId))),
taskId_(std::move(taskId)),
nodeId_(std::move(nodeId)),
Expand All @@ -43,10 +44,12 @@ OperatorReplayerBase::OperatorReplayerBase(
nodeTraceDir_(exec::trace::getNodeTraceDirectory(taskTraceDir_, nodeId_)),
fs_(filesystems::getFileSystem(taskTraceDir_, nullptr)),
pipelineIds_(exec::trace::listPipelineIds(nodeTraceDir_, fs_)),
maxDrivers_(exec::trace::getNumDrivers(
nodeTraceDir_,
pipelineIds_.front(),
fs_)) {
driverIds_(
driverIds.empty() ? exec::trace::listDriverIds(
nodeTraceDir_,
pipelineIds_.front(),
fs_)
: exec::trace::extractDriverIds(driverIds)) {
VELOX_USER_CHECK(!taskTraceDir_.empty());
VELOX_USER_CHECK(!taskId_.empty());
VELOX_USER_CHECK(!nodeId_.empty());
Expand All @@ -66,7 +69,7 @@ OperatorReplayerBase::OperatorReplayerBase(
RowVectorPtr OperatorReplayerBase::run() {
const auto restoredPlanNode = createPlan();
return exec::test::AssertQueryBuilder(restoredPlanNode)
.maxDrivers(maxDrivers_)
.maxDrivers(driverIds_.size())
.configs(queryConfigs_)
.connectorSessionProperties(connectorConfigs_)
.copyResults(memory::MemoryManager::getInstance()->tracePool());
Expand All @@ -87,6 +90,7 @@ core::PlanNodePtr OperatorReplayerBase::createPlan() const {
.traceScan(
nodeTraceDir_,
pipelineIds_.front(),
driverIds_,
exec::trace::getDataType(planFragment_, nodeId_))
.addNode(replayNodeFactory(replayNode))
.planNode();
Expand Down
6 changes: 3 additions & 3 deletions velox/tool/trace/OperatorReplayerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class OperatorReplayerBase {
std::string queryId,
std::string taskId,
std::string nodeId,
std::string operatorType);
std::string operatorType,
const std::string& driverIds);
virtual ~OperatorReplayerBase() = default;

OperatorReplayerBase(const OperatorReplayerBase& other) = delete;
Expand All @@ -59,8 +60,7 @@ class OperatorReplayerBase {
const std::string nodeTraceDir_;
const std::shared_ptr<filesystems::FileSystem> fs_;
const std::vector<uint32_t> pipelineIds_;
const uint32_t maxDrivers_;

const std::vector<uint32_t> driverIds_;
const std::shared_ptr<core::PlanNodeIdGenerator> planNodeIdGenerator_{
std::make_shared<core::PlanNodeIdGenerator>()};

Expand Down
11 changes: 9 additions & 2 deletions velox/tool/trace/PartitionedOutputReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,15 @@ PartitionedOutputReplayer::PartitionedOutputReplayer(
const std::string& nodeId,
VectorSerde::Kind serdeKind,
const std::string& operatorType,
const std::string& driverIds,
const ConsumerCallBack& consumerCb)
: OperatorReplayerBase(traceDir, queryId, taskId, nodeId, operatorType),
: OperatorReplayerBase(
traceDir,
queryId,
taskId,
nodeId,
operatorType,
driverIds),
originalNode_(dynamic_cast<const core::PartitionedOutputNode*>(
core::PlanNode::findFirstNode(
planFragment_.get(),
Expand All @@ -134,7 +141,7 @@ RowVectorPtr PartitionedOutputReplayer::run() {
0,
createQueryContext(queryConfigs_, executor_.get()),
Task::ExecutionMode::kParallel);
task->start(maxDrivers_);
task->start(driverIds_.size());

consumeAllData(
bufferManager_,
Expand Down
1 change: 1 addition & 0 deletions velox/tool/trace/PartitionedOutputReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PartitionedOutputReplayer final : public OperatorReplayerBase {
const std::string& nodeId,
VectorSerde::Kind serdeKind,
const std::string& operatorType,
const std::string& driverIds,
const ConsumerCallBack& consumerCb = [](auto partition, auto page) {});

RowVectorPtr run() override;
Expand Down
11 changes: 3 additions & 8 deletions velox/tool/trace/TableScanReplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace facebook::velox::tool::trace {
RowVectorPtr TableScanReplayer::run() {
const auto plan = createPlan();
return exec::test::AssertQueryBuilder(plan)
.maxDrivers(maxDrivers_)
.maxDrivers(driverIds_.size())
.configs(queryConfigs_)
.connectorSessionProperties(connectorConfigs_)
.splits(getSplits())
Expand All @@ -52,14 +52,9 @@ core::PlanNodePtr TableScanReplayer::createPlanNode(

std::vector<exec::Split> TableScanReplayer::getSplits() const {
std::vector<std::string> splitInfoDirs;
if (driverId_ != -1) {
for (const auto driverId : driverIds_) {
splitInfoDirs.push_back(exec::trace::getOpTraceDirectory(
nodeTraceDir_, pipelineIds_.front(), driverId_));
} else {
for (auto i = 0; i < maxDrivers_; ++i) {
splitInfoDirs.push_back(exec::trace::getOpTraceDirectory(
nodeTraceDir_, pipelineIds_.front(), i));
}
nodeTraceDir_, pipelineIds_.front(), driverId));
}
const auto splitStrs =
exec::trace::OperatorTraceSplitReader(
Expand Down
13 changes: 8 additions & 5 deletions velox/tool/trace/TableScanReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ class TableScanReplayer final : public OperatorReplayerBase {
const std::string& taskId,
const std::string& nodeId,
const std::string& operatorType,
const int32_t driverId = -1)
: OperatorReplayerBase(traceDir, queryId, taskId, nodeId, operatorType),
driverId_(driverId) {}
const std::string& driverIds)
: OperatorReplayerBase(
traceDir,
queryId,
taskId,
nodeId,
operatorType,
driverIds) {}

RowVectorPtr run() override;

Expand All @@ -46,8 +51,6 @@ class TableScanReplayer final : public OperatorReplayerBase {
const core::PlanNodePtr& /*source*/) const override;

std::vector<exec::Split> getSplits() const;

const int32_t driverId_;
};

} // namespace facebook::velox::tool::trace
Loading

0 comments on commit 22392b1

Please sign in to comment.