Skip to content

Commit

Permalink
fix: Incorrect result when hash probe dynamic filters push down throu…
Browse files Browse the repository at this point in the history
…gh right join

Summary:
Right/full join could introduce extra null values in the output column compared to probe side, so they should not be considered as "identity projection" and not qualified for dynamic filter push through.  When the condition of completely replacement of hash probe is satisfied, we were allowing all these extra null rows to pass through the join and generating incorrect result.  This change fixes this bug.

It's possible to generate a filter using hash join, but not completely replacing the hash probe though, since the extra nulls should be filtered out during the actual probe process.  Having such mechanism requires extra information to be added in addition to "identity projections" though (i.e. whether only new nulls will be introduced for certain column).  We can revisit this if we see there is a use case for such optimization.

Differential Revision: D66833193
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 5, 2024
1 parent 3dd572f commit 078ef5c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 20 deletions.
37 changes: 19 additions & 18 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,15 @@ void HashProbe::initialize() {

size_t numIdentityProjections = 0;
for (auto i = 0; i < probeType_->size(); ++i) {
auto name = probeType_->nameOf(i);
auto& name = probeType_->nameOf(i);
auto outIndex = outputType_->getChildIdxIfExists(name);
if (outIndex.has_value()) {
projectedInputColumns_.insert(i);
identityProjections_.emplace_back(i, outIndex.value());
if (outIndex.value() == i) {
if (!outIndex.has_value()) {
continue;
}
projectedInputColumns_[i] = *outIndex;
if (!isRightJoin(joinType_) && !isFullJoin(joinType_)) {
identityProjections_.emplace_back(i, *outIndex);
if (*outIndex == i) {
++numIdentityProjections;
}
}
Expand Down Expand Up @@ -828,14 +831,12 @@ void HashProbe::fillLeftSemiProjectMatchColumn(vector_size_t size) {
void HashProbe::fillOutput(vector_size_t size) {
prepareOutput(size);

for (auto projection : identityProjections_) {
for (auto [in, out] : projectedInputColumns_) {
// Load input vector if it is being split into multiple batches. It is not
// safe to wrap unloaded LazyVector into two different dictionaries.
ensureLoadedIfNotAtEnd(projection.inputChannel);
auto inputChild = input_->childAt(projection.inputChannel);

output_->childAt(projection.outputChannel) =
wrapChild(size, outputRowMapping_, inputChild);
ensureLoadedIfNotAtEnd(in);
auto inputChild = input_->childAt(in);
output_->childAt(out) = wrapChild(size, outputRowMapping_, inputChild);
}

if (isLeftSemiProjectJoin(joinType_)) {
Expand Down Expand Up @@ -882,9 +883,9 @@ RowVectorPtr HashProbe::getBuildSideOutput() {
prepareOutput(numOut);

// Populate probe-side columns of the output with nulls.
for (auto projection : identityProjections_) {
output_->childAt(projection.outputChannel) = BaseVector::createNullConstant(
outputType_->childAt(projection.outputChannel), numOut, pool());
for (auto [in, out] : projectedInputColumns_) {
output_->childAt(out) = BaseVector::createNullConstant(
outputType_->childAt(out), numOut, pool());
}

extractColumns(
Expand Down Expand Up @@ -914,12 +915,12 @@ RowVectorPtr HashProbe::getBuildSideOutput() {
return output_;
}

void HashProbe::clearIdentityProjectedOutput() {
void HashProbe::clearProjectedOutput() {
if (!output_ || output_.use_count() != 1) {
return;
}
for (auto& projection : identityProjections_) {
output_->childAt(projection.outputChannel) = nullptr;
for (auto& [_, out] : projectedInputColumns_) {
output_->childAt(out) = nullptr;
}
}

Expand Down Expand Up @@ -1005,7 +1006,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
return output_;
}

clearIdentityProjectedOutput();
clearProjectedOutput();

if (!input_) {
if (hasMoreInput()) {
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class HashProbe : public Operator {
// number mappings or input vectors. In this way input vectors do
// not have to be copied and will be singly referenced by their
// producer.
void clearIdentityProjectedOutput();
void clearProjectedOutput();

// Populate output columns with matching build-side rows
// for the right semi join and non-matching build-side rows
Expand Down Expand Up @@ -422,7 +422,7 @@ class HashProbe : public Operator {
RowTypePtr filterInputType_;

// The input channels that are projected to the output.
std::unordered_set<column_index_t> projectedInputColumns_;
folly::F14FastMap<column_index_t, column_index_t> projectedInputColumns_;

// Maps input channels to channels in 'filterInputType_'.
std::vector<IdentityProjection> filterInputProjections_;
Expand Down
45 changes: 45 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5305,6 +5305,51 @@ TEST_F(HashJoinTest, dynamicFiltersPushDownThroughAgg) {
.run();
}

TEST_F(HashJoinTest, noDynamicFiltersPushDownThroughRightJoin) {
std::vector<RowVectorPtr> innerBuild = {makeRowVector(
{"a"},
{
makeFlatVector<int64_t>(5, [](auto i) { return 2 * i; }),
})};
std::vector<RowVectorPtr> rightBuild = {makeRowVector(
{"b"},
{
makeFlatVector<int64_t>(5, [](auto i) { return 1 + 2 * i; }),
})};
std::vector<RowVectorPtr> rightProbe = {makeRowVector(
{"aa", "bb"},
{
makeFlatVector<int64_t>(10, folly::identity),
makeFlatVector<int64_t>(10, folly::identity),
})};
auto file = TempFilePath::create();
writeToFile(file->getPath(), rightProbe);
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId scanNodeId;
auto plan =
PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(rightProbe[0]->type()))
.capturePlanNodeId(scanNodeId)
.hashJoin(
{"bb"},
{"b"},
PlanBuilder(planNodeIdGenerator).values(rightBuild).planNode(),
"",
{"aa", "b"},
core::JoinType::kRight)
.hashJoin(
{"aa"},
{"a"},
PlanBuilder(planNodeIdGenerator).values(innerBuild).planNode(),
"",
{"aa"})
.planNode();
AssertQueryBuilder(plan)
.split(scanNodeId, Split(makeHiveConnectorSplit(file->getPath())))
.assertResults(
BaseVector::create<RowVector>(innerBuild[0]->type(), 0, pool_.get()));
}

// Verify the size of the join output vectors when projecting build-side
// variable-width column.
TEST_F(HashJoinTest, memoryUsage) {
Expand Down

0 comments on commit 078ef5c

Please sign in to comment.