Skip to content

Commit

Permalink
Add printouts to debug null splits
Browse files Browse the repository at this point in the history
  • Loading branch information
yingsu00 committed Mar 19, 2024
1 parent 01d2e5b commit c3a50d7
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 19 deletions.
12 changes: 10 additions & 2 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,23 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
infoColumns(_infoColumns) {}

std::string toString() const override {
std::string result("");
if (tableBucketNumber.has_value()) {
return fmt::format(
result += fmt::format(
"Hive: {} {} - {} {}",
filePath,
start,
length,
tableBucketNumber.value());
} else {
result += fmt::format("Hive: {} {} - {}", filePath, start, length);
}
return fmt::format("Hive: {} {} - {}", filePath, start, length);

for (auto entry : partitionKeys) {
result += fmt::format(" Partition keys {}:{}", entry.first, entry.second);
}

return result;
}

std::string getFileName() const {
Expand Down
55 changes: 46 additions & 9 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,41 +578,78 @@ bool testFilters(
auto totalRows = reader->numberOfRows();
const auto& fileTypeWithId = reader->typeWithId();
const auto& rowType = reader->rowType();
VLOG(1) << "testFilters begin rowType=" << rowType->toString()
<< " rowType->size()=" << rowType->size()
<< " scanSpec->fieldName()=" << scanSpec->fieldName()
<< " scanSpec->children().size()=" << scanSpec->children().size();
std::string partitionKeyStr("");
for (auto& entry : partitionKey) {
partitionKeyStr += fmt::format("{}:{} ", entry.first, entry.second);
}
VLOG(1) << " partitionKeys: " << partitionKeyStr;
if (totalRows.has_value()) {
VLOG(1) << " totalRows=" << totalRows.value();
}

for (const auto& child : scanSpec->children()) {
VLOG(1) << " child spec " << child->fieldName() << " has filter? "
<< child->filter() << " filter=" << child->filter()->toString()
<< " child->filter()->deterministic="
<< child->filter()->deterministic
<< " child->filter()->testNull()=" << child->filter()->testNull();
if (child->filter()) {
const auto& name = child->fieldName();
VLOG(1) << " rowType->containsChild(name) "
<< rowType->containsChild(name);
if (!rowType->containsChild(name)) {
// If missing column is partition key.
auto iter = partitionKey.find(name);
VLOG(1) << " found partition key? " << (iter != partitionKey.end());
if (iter != partitionKey.end() && iter->second.has_value()) {
return applyPartitionFilter(
VLOG(1) << " found partition key and it has value "
<< iter->second.value();
if(!applyPartitionFilter(
(*partitionKeysHandle)[name]->dataType()->kind(),
iter->second.value(),
child->filter());
}
// Column is missing. Most likely due to schema evolution.
if (child->filter()->isDeterministic() &&
!child->filter()->testNull()) {
child->filter())) {
VLOG(1) << " Skipping " << filePath
<< " based on stats and filter for partitioning column "
<< child->fieldName() << " with value " << iter->second.value();
return false;
}
} else if (child->filter()->isDeterministic() &&
!child->filter()->testNull()) {
// Column is missing. Most likely due to schema evolution.
VLOG(1) << " Skipping " << filePath << " for column "
<< child->fieldName() << " with NULL value ";
return false;
} else {
VLOG(1) << " something else";
}
} else {
const auto& typeWithId = fileTypeWithId->childByName(name);
auto columnStats = reader->columnStatistics(typeWithId->id());
if (columnStats != nullptr &&
!testFilter(
child->filter(),
VLOG(1) << " rowType containsChild " << name << " columnStats "
<< columnStats.get();
if (columnStats != nullptr) {
if (!testFilter(
child->filter(),
columnStats.get(),
totalRows.value(),
typeWithId->type())) {
VLOG(1) << "Skipping " << filePath
<< " based on stats and filter for column "
<< child->fieldName();
return false;
} else {
VLOG(1) << " normal case, filter passed";
}
}
}
}
}

VLOG(1) << "testFilters end filter passed";
return true;
}

Expand Down
33 changes: 27 additions & 6 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ HiveDataSource::HiveDataSource(
readerRowTypes.push_back(input->type());
}
remainingFilterSubfields = remainingFilterExpr->extractSubfields();
if (VLOG_IS_ON(1)) {
VLOG(1) << fmt::format(
"Extracted subfields from remaining filter: [{}]",
fmt::join(remainingFilterSubfields, ", "));
}
// if (VLOG_IS_ON(1)) {
VLOG(1) << fmt::format(
"Extracted subfields from remaining filter: [{}]",
fmt::join(remainingFilterSubfields, ", "));
// }
for (auto& subfield : remainingFilterSubfields) {
auto& name = getColumnName(subfield);
auto it = subfields.find(name);
Expand Down Expand Up @@ -163,6 +163,15 @@ HiveDataSource::HiveDataSource(
ioStats_ = std::make_shared<io::IoStatistics>();
}

HiveDataSource::~HiveDataSource() {
VLOG(2) << " totalSplits : " << runtimeStats_.totalSplits
<< " skippedSplits : " << runtimeStats_.skippedSplits
<< " skippedSplitBytes : " << runtimeStats_.skippedSplitBytes
<< " skippedStrides : " << runtimeStats_.skippedStrides
<< " totalScannedRows : " << runtimeStats_.totalScannedRows
<< " totalOutputRows : " << runtimeStats_.totalOutputRows;
}

std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
return SplitReader::create(
split_,
Expand Down Expand Up @@ -195,14 +204,21 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
// so we initialize it beforehand.
splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);

VLOG(1) << "HiveDataSource::addSplit end. splitReader_=" << splitReader_.get();
if (splitReader_) {
VLOG(1) << " emptySplit=" << splitReader_->emptySplit();
}
}

std::optional<RowVectorPtr> HiveDataSource::next(
uint64_t size,
velox::ContinueFuture& /*future*/) {
VELOX_CHECK(split_ != nullptr, "No split to process. Call addSplit first.");

VLOG(1) << "HiveDataSource::next begin splitReader_=" << splitReader_.get();
if (splitReader_ && splitReader_->emptySplit()) {
VLOG(1) << " emptySplit is true, returning";
resetSplit();
return nullptr;
}
Expand All @@ -212,7 +228,11 @@ std::optional<RowVectorPtr> HiveDataSource::next(
}

auto rowsScanned = splitReader_->next(size, output_);
completedRows_ += rowsScanned;
runtimeStats_.totalScannedRows += rowsScanned;

VLOG(1) << " splitReader_->next finished. rowsScanned=" << rowsScanned
<< " runtimeStats_.totalScannedRows="
<< runtimeStats_.totalScannedRows;

if (rowsScanned) {
VELOX_CHECK(
Expand Down Expand Up @@ -261,6 +281,7 @@ std::optional<RowVectorPtr> HiveDataSource::next(
exec::wrapChild(rowsRemaining, remainingIndices, child));
}

runtimeStats_.totalOutputRows += rowsRemaining;
return std::make_shared<RowVector>(
pool_, outputType_, BufferPtr(nullptr), rowsRemaining, outputColumns);
}
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class HiveDataSource : public DataSource {
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<HiveConfig>& hiveConfig);

~HiveDataSource();

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

std::optional<RowVectorPtr> next(uint64_t size, velox::ContinueFuture& future)
Expand Down Expand Up @@ -130,6 +132,8 @@ class HiveDataSource : public DataSource {
std::atomic<uint64_t> totalRemainingFilterTime_{0};
core::ExpressionEvaluator* expressionEvaluator_;
uint64_t completedRows_ = 0;
// uint64_t totalOutputRows_ = 0;
// uint64_t totalSplits_ = 0;

// Reusable memory for remaining filter evaluation.
VectorPtr filterResult_;
Expand Down
16 changes: 16 additions & 0 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,15 @@ void SplitReader::prepareSplit(
VELOX_CHECK_NE(
baseReaderOpts_.getFileFormat(), dwio::common::FileFormat::UNKNOWN);

VLOG(1) << " SplitReader::prepareSplit begin. emptySplit_=" << emptySplit_
<< " baseRowReader_ " << baseRowReader_.get();

std::shared_ptr<FileHandle> fileHandle;
try {
fileHandle = fileHandleFactory_->generate(hiveSplit_->filePath).second;
VLOG(1) << " created fileHandle " << fileHandle;
} catch (const VeloxRuntimeError& e) {
VLOG(1) << " failed to create fileHandle. ErrorCode= " << e.errorCode();
if (e.errorCode() == error_code::kFileNotFound &&
hiveConfig_->ignoreMissingFiles(
connectorQueryCtx_->sessionProperties())) {
Expand All @@ -174,6 +179,8 @@ void SplitReader::prepareSplit(
emptySplit_ = false;
if (baseReader_->numberOfRows() == 0) {
emptySplit_ = true;
VLOG(1)
<< " baseReader_->numberOfRows() == 0. emptySplit_ = true. Returning";
return;
}

Expand All @@ -188,6 +195,9 @@ void SplitReader::prepareSplit(
emptySplit_ = true;
++runtimeStats.skippedSplits;
runtimeStats.skippedSplitBytes += hiveSplit_->length;
VLOG(1) << " testFilters failed. runtimeStats.skippedSplits="
<< runtimeStats.skippedSplits << " baseRowReader_ "
<< baseRowReader_.get();
return;
}

Expand All @@ -205,6 +215,9 @@ void SplitReader::prepareSplit(
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);

VLOG(1) << " SplitReader::prepareSplit end. emptySplit_=" << emptySplit_
<< " baseRowReader_ " << baseRowReader_.get();
}

std::vector<TypePtr> SplitReader::adaptColumns(
Expand Down Expand Up @@ -281,6 +294,9 @@ std::vector<TypePtr> SplitReader::adaptColumns(
}

uint64_t SplitReader::next(int64_t size, VectorPtr& output) {
VLOG(1) << " SplitReader::next begin. emptySplit_=" << emptySplit_
<< " baseRowReader_ " << baseRowReader_.get();

if (!baseReaderOpts_.randomSkip()) {
return baseRowReader_->next(size, output);
}
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ void SelectiveStructColumnReaderBase::next(
VectorPtr& result,
const Mutation* mutation) {
process::TraceContext trace("SelectiveStructColumnReaderBase::next");
VLOG(1) << "SelectiveStructColumnReaderBase::next begin children_.size()"
<< children_.size()
<< " scanSpec_->children()=" << scanSpec_->children().size();

if (children_.empty()) {
if (mutation) {
if (mutation->deletedRows) {
Expand All @@ -78,10 +82,13 @@ void SelectiveStructColumnReaderBase::next(
auto& childSpecs = scanSpec_->children();
for (auto& childSpec : childSpecs) {
VELOX_CHECK(childSpec->isConstant());
VLOG(1) << " childSpec->fieldName=" << childSpec->fieldName()
<< " childSpec->projectOut()=" << childSpec->projectOut();
if (childSpec->projectOut()) {
auto channel = childSpec->channel();
resultRowVector->childAt(channel) = BaseVector::wrapInConstant(
numValues, 0, childSpec->constantValue());
VLOG(1) << " returning constant vector of size " << numValues;
}
}
return;
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/common/Statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ struct ColumnReaderStatistics {
};

struct RuntimeStatistics {
// Total number of splits received.
int64_t totalSplits{0};

// Number of splits skipped based on statistics.
int64_t skippedSplits{0};

Expand All @@ -535,10 +538,14 @@ struct RuntimeStatistics {
// Number of strides (row groups) skipped based on statistics.
int64_t skippedStrides{0};

int64_t totalScannedRows{0};
int64_t totalOutputRows{0};

ColumnReaderStatistics columnReaderStatistics;

std::unordered_map<std::string, RuntimeCounter> toMap() {
return {
{"totalSplits_", RuntimeCounter(totalSplits)},
{"skippedSplits", RuntimeCounter(skippedSplits)},
{"skippedSplitBytes",
RuntimeCounter(skippedSplitBytes, RuntimeCounter::Unit::kBytes)},
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ StopReason Driver::runInternal(
op,
curOperatorId_,
kOpMethodGetOutput);
VLOG(1) << "op->getOutput() finished. intermediateResult: " << intermediateResult;
if (intermediateResult) {
VELOX_CHECK(
intermediateResult->size() > 0,
Expand All @@ -608,6 +609,7 @@ StopReason Driver::runInternal(
});
{
auto lockedStats = nextOp->stats().wlock();
VLOG(1) << " addInputVector inputPositions=" << intermediateResult->size();
lockedStats->addInputVector(
resultBytes, intermediateResult->size());
}
Expand All @@ -627,6 +629,7 @@ StopReason Driver::runInternal(
continue;
} else {
stop = task()->shouldStop();
VLOG(1) << " intermediateResult is null. stop=" << stop;
if (stop != StopReason::kNone) {
guard.notThrown();
return stop;
Expand Down Expand Up @@ -654,6 +657,7 @@ StopReason Driver::runInternal(
op,
curOperatorId_,
kOpMethodIsFinished);
VLOG(1) << " op->isFinished()=" << finished;
if (finished) {
auto timer = createDeltaCpuWallTimer(
[op, this](const CpuWallTiming& timing) {
Expand Down
13 changes: 11 additions & 2 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,26 @@ RowVectorPtr TableScan::getOutput() {
(getCurrentTimeMicro() - ioTimeStartMicros) * 1'000,
RuntimeCounter::Unit::kNanos));

VLOG(1) << " TableScan::getOutput has value " << dataOptional.has_value();
if (!dataOptional.has_value()) {
blockingReason_ = BlockingReason::kWaitForConnector;
return nullptr;
}

curStatus_ = "getOutput: updating stats_.rawInput";
lockedStats->rawInputPositions = dataSource_->getCompletedRows();
lockedStats->rawInputBytes = dataSource_->getCompletedBytes();
lockedStats->rawInputPositions += dataSource_->getCompletedRows();
lockedStats->rawInputBytes += dataSource_->getCompletedBytes();

VLOG(1) << " TableScan::getOutput updating stats rawInputPositions "
<< lockedStats->rawInputPositions;

auto data = dataOptional.value();
if (data) {
VLOG(1) << " TableScan::getOutput data->size()=" << data->size();
if (data->size() > 0) {
VLOG(1) << " TableScan::getOutput updating stats inputPositions "
<< data->size();

lockedStats->addInputVector(data->estimateFlatSize(), data->size());
constexpr int kMaxSelectiveBatchSizeMultiplier = 4;
maxFilteringRatio_ = std::max(
Expand Down

0 comments on commit c3a50d7

Please sign in to comment.