Skip to content

Commit

Permalink
[Enhancement] push runtime in_filter through agg node (#52625)
Browse files Browse the repository at this point in the history
Signed-off-by: stephen <[email protected]>
  • Loading branch information
stephen-shelby authored Nov 5, 2024
1 parent a5373f9 commit f8d2356
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
18 changes: 18 additions & 0 deletions be/src/exec/aggregate/aggregate_base_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ void AggregateBaseNode::close(RuntimeState* state) {
ExecNode::close(state);
}

void AggregateBaseNode::push_down_tuple_slot_mappings(RuntimeState* state,
const std::vector<TupleSlotMapping>& parent_mappings) {
_tuple_slot_mappings = parent_mappings;

DCHECK(_tuple_ids.size() == 1);
for (auto& expr_ctx : _group_by_expr_ctxs) {
if (expr_ctx->root()->is_slotref()) {
auto ref = dynamic_cast<ColumnRef*>(expr_ctx->root());
DCHECK(ref != nullptr);
_tuple_slot_mappings.emplace_back(ref->tuple_id(), ref->slot_id(), _tuple_ids[0], ref->slot_id());
}
}

for (auto& child : _children) {
child->push_down_tuple_slot_mappings(state, _tuple_slot_mappings);
}
}

void AggregateBaseNode::push_down_join_runtime_filter(RuntimeState* state, RuntimeFilterProbeCollector* collector) {
// accept runtime filters from parent if possible.
_runtime_filter_collector.push_down(state, id(), collector, _tuple_ids, _local_rf_waiting_set);
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/aggregate/aggregate_base_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class AggregateBaseNode : public ExecNode {
Status prepare(RuntimeState* state) override;
void close(RuntimeState* state) override;
void push_down_join_runtime_filter(RuntimeState* state, RuntimeFilterProbeCollector* collector) override;
void push_down_tuple_slot_mappings(RuntimeState* state,
const std::vector<TupleSlotMapping>& parent_mappings) override;

protected:
const TPlanNode& _tnode;
Expand Down

0 comments on commit f8d2356

Please sign in to comment.