diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 885d098491be..2b0111353f89 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -167,12 +167,15 @@ void HashProbe::initialize() { size_t numIdentityProjections = 0; for (auto i = 0; i < probeType_->size(); ++i) { - auto name = probeType_->nameOf(i); + auto& name = probeType_->nameOf(i); auto outIndex = outputType_->getChildIdxIfExists(name); - if (outIndex.has_value()) { - projectedInputColumns_.insert(i); - identityProjections_.emplace_back(i, outIndex.value()); - if (outIndex.value() == i) { + if (!outIndex.has_value()) { + continue; + } + projectedInputColumns_[i] = *outIndex; + if (!isRightJoin(joinType_) && !isFullJoin(joinType_)) { + identityProjections_.emplace_back(i, *outIndex); + if (*outIndex == i) { ++numIdentityProjections; } } @@ -828,14 +831,12 @@ void HashProbe::fillLeftSemiProjectMatchColumn(vector_size_t size) { void HashProbe::fillOutput(vector_size_t size) { prepareOutput(size); - for (auto projection : identityProjections_) { + for (auto [in, out] : projectedInputColumns_) { // Load input vector if it is being split into multiple batches. It is not // safe to wrap unloaded LazyVector into two different dictionaries. - ensureLoadedIfNotAtEnd(projection.inputChannel); - auto inputChild = input_->childAt(projection.inputChannel); - - output_->childAt(projection.outputChannel) = - wrapChild(size, outputRowMapping_, inputChild); + ensureLoadedIfNotAtEnd(in); + auto inputChild = input_->childAt(in); + output_->childAt(out) = wrapChild(size, outputRowMapping_, inputChild); } if (isLeftSemiProjectJoin(joinType_)) { @@ -882,9 +883,9 @@ RowVectorPtr HashProbe::getBuildSideOutput() { prepareOutput(numOut); // Populate probe-side columns of the output with nulls. - for (auto projection : identityProjections_) { - output_->childAt(projection.outputChannel) = BaseVector::createNullConstant( - outputType_->childAt(projection.outputChannel), numOut, pool()); + for (auto [in, out] : projectedInputColumns_) { + output_->childAt(out) = BaseVector::createNullConstant( + outputType_->childAt(out), numOut, pool()); } extractColumns( @@ -914,12 +915,12 @@ RowVectorPtr HashProbe::getBuildSideOutput() { return output_; } -void HashProbe::clearIdentityProjectedOutput() { +void HashProbe::clearProjectedOutput() { if (!output_ || output_.use_count() != 1) { return; } - for (auto& projection : identityProjections_) { - output_->childAt(projection.outputChannel) = nullptr; + for (auto& [_, out] : projectedInputColumns_) { + output_->childAt(out) = nullptr; } } @@ -1005,7 +1006,7 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) { return output_; } - clearIdentityProjectedOutput(); + clearProjectedOutput(); if (!input_) { if (hasMoreInput()) { diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 8166db111017..81548b8f80cb 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -129,7 +129,7 @@ class HashProbe : public Operator { // number mappings or input vectors. In this way input vectors do // not have to be copied and will be singly referenced by their // producer. - void clearIdentityProjectedOutput(); + void clearProjectedOutput(); // Populate output columns with matching build-side rows // for the right semi join and non-matching build-side rows @@ -422,7 +422,7 @@ class HashProbe : public Operator { RowTypePtr filterInputType_; // The input channels that are projected to the output. - std::unordered_set projectedInputColumns_; + folly::F14FastMap projectedInputColumns_; // Maps input channels to channels in 'filterInputType_'. std::vector filterInputProjections_; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 470b9026b403..7698cecd3aa8 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5305,6 +5305,51 @@ TEST_F(HashJoinTest, dynamicFiltersPushDownThroughAgg) { .run(); } +TEST_F(HashJoinTest, noDynamicFiltersPushDownThroughRightJoin) { + std::vector innerBuild = {makeRowVector( + {"a"}, + { + makeFlatVector(5, [](auto i) { return 2 * i; }), + })}; + std::vector rightBuild = {makeRowVector( + {"b"}, + { + makeFlatVector(5, [](auto i) { return 1 + 2 * i; }), + })}; + std::vector rightProbe = {makeRowVector( + {"aa", "bb"}, + { + makeFlatVector(10, folly::identity), + makeFlatVector(10, folly::identity), + })}; + auto file = TempFilePath::create(); + writeToFile(file->getPath(), rightProbe); + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId; + auto plan = + PlanBuilder(planNodeIdGenerator) + .tableScan(asRowType(rightProbe[0]->type())) + .capturePlanNodeId(scanNodeId) + .hashJoin( + {"bb"}, + {"b"}, + PlanBuilder(planNodeIdGenerator).values(rightBuild).planNode(), + "", + {"aa", "b"}, + core::JoinType::kRight) + .hashJoin( + {"aa"}, + {"a"}, + PlanBuilder(planNodeIdGenerator).values(innerBuild).planNode(), + "", + {"aa"}) + .planNode(); + AssertQueryBuilder(plan) + .split(scanNodeId, Split(makeHiveConnectorSplit(file->getPath()))) + .assertResults( + BaseVector::create(innerBuild[0]->type(), 0, pool_.get())); +} + // Verify the size of the join output vectors when projecting build-side // variable-width column. TEST_F(HashJoinTest, memoryUsage) {