Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support DataSketches Theta Sketche #54568

Open
wants to merge 50 commits into
base: main
Choose a base branch
from

Conversation

chenminghua8
Copy link
Contributor

Why I'm doing:

What I'm doing:

Implement the aggregation function that supports DataSketches Theta:

  1. Add ds_theta_state.h to implement the AggregateState that supports DataSketches Theta;
  2. Add theta_sketch.h and theta_sketch.cpp to implement the encapsulation of DataSketches Theta Sketche;
  3. Register the ds_theta function through SketchType::THETA generic to implement the function that supports DataSketches Theta.

Fixes #50671

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.4
    • 3.3
    • 3.2
    • 3.1
    • 3.0

chenminghua8 and others added 30 commits October 22, 2024 19:10
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
Signed-off-by: chenminghua8 <[email protected]>
@chenminghua8 chenminghua8 requested review from a team as code owners December 31, 2024 10:27
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Dec 31, 2024
}
};

} // namespace starrocks
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Potential undefined behavior due to uninitialized memory access when serializing the data.

You can modify the code like this:

void serialize_to_column([[maybe_unused]] FunctionContext* ctx, ConstAggDataPtr __restrict state,
                         Column* to) const override {
    DCHECK(to->is_binary());
    auto* column = down_cast<BinaryColumn*>(to);
    if (UNLIKELY(!this->data(state).is_inited())) {
        column->append_default();
    } else {
        size_t serialized_size = this->data(state).serialize_size();
        std::vector<uint8_t> result(serialized_size); // Use vector to manage memory safely
        size_t actual_size = this->data(state).serialize(result.data());
        column->append(Slice(result.data(), actual_size));
    }
}

This change uses a std::vector<uint8_t> instead of a stack-allocated array with variable length, ensuring memory is properly initialized and managed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

fields.add(new StructField("upper_bound", Type.BIGINT));
return new ArrayType(new StructType(fields, true));
};

public List<Function> getBuiltinFunctions() {
List<Function> builtinFunctions = Lists.newArrayList();
for (Map.Entry<String, List<Function>> entry : vectorizedFunctions.entrySet()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
The function DS_HLL_COUNT_DISTINCT appears to be registered twice, which could lead to incorrect behavior if each registration has different characteristics or versions.

You can modify the code like this:

// In registerBuiltinDsFunction(), remove redundant registrations for DS_HLL_COUNT_DISTINCT

private void registerBuiltinDsFunction() {
    for (Type t : Type.getSupportedTypes()) {
        if (t.isFunctionType()) {
            continue;
        }
        if (t.isNull()) {
            continue; // NULL is handled through type promotion.
        }
        if (t.isChar()) {
            continue; // promoted to STRING
        }

        if (t.isPseudoType()) {
            continue; // promoted to pseudo
        }

        // Ensure no duplicate registrations for the same function.
        // ds_hll_count_distinct(col)
        addBuiltin(AggregateFunction.createBuiltin(
                DS_HLL_COUNT_DISTINCT, Lists.newArrayList(t), Type.BIGINT, Type.VARBINARY,
                true, false, true));

        // ds_theta(col)
        addBuiltin(AggregateFunction.createBuiltin(
                DS_THETA, Lists.newArrayList(t), Type.BIGINT, Type.VARBINARY,
                true, false, true));
    }
}

This change ensures that the DS_HLL_COUNT_DISTINCT function gets registered correctly without adding it multiple times unintentionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function parameters are different and need to be registered multiple times.

}
};

} // namespace starrocks No newline at end of file
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Dereferencing a potentially null pointer ds_sketch_wrapper in the update, update_batch_single_state_with_frame, and serialize functions without checking if it's initialized.

You can modify the code like this:

void update(const Column* data_column, size_t row_num) const {
    if (!is_inited()) return; // Check for initialization
    uint64_t value = 0;
    const ColumnType* column = down_cast<const ColumnType*>(data_column);

    if constexpr (lt_is_string<LT>) {
        Slice s = column->get_slice(row_num);
        value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED);
    } else {
        const auto& v = column->get_data();
        value = HashUtil::murmur_hash64A(&v[row_num], sizeof(v[row_num]), HashUtil::MURMUR_SEED);
    }
    ds_sketch_wrapper->update(value);
}

void update_batch_single_state_with_frame(const Column* data_column, int64_t frame_start, int64_t frame_end) const {
    if (!is_inited()) return; // Check for initialization
    const ColumnType* column = down_cast<const ColumnType*>(data_column);
    if constexpr (lt_is_string<LT>) {
        uint64_t value = 0;
        for (size_t i = frame_start; i < frame_end; ++i) {
            Slice s = column->get_slice(i);
            value = HashUtil::murmur_hash64A(s.data, s.size, HashUtil::MURMUR_SEED);

            if (value != 0) {
                ds_sketch_wrapper->update(value);
            }
        }
    } else {
        uint64_t value = 0;
        const auto& v = column->get_data();
        for (size_t i = frame_start; i < frame_end; ++i) {
            value = HashUtil::murmur_hash64A(&v[i], sizeof(v[i]), HashUtil::MURMUR_SEED);

            if (value != 0) {
                ds_sketch_wrapper->update(value);
            }
        }
    }
}

size_t serialize(uint8_t* dst) const {
    if (!is_inited()) return 0; // Check for initialization
    return ds_sketch_wrapper->serialize(dst);
}

This modification ensures that ds_sketch_wrapper is initialized before calling its methods, preventing potential null pointer dereferences.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to check whether it has been initialized, because the method calling it has already done the corresponding processing.

Copy link

github-actions bot commented Jan 2, 2025

[FE Incremental Coverage Report]

pass : 29 / 36 (80.56%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/sql/analyzer/FunctionAnalyzer.java 2 3 66.67% [539]
🔵 com/starrocks/catalog/FunctionSet.java 24 30 80.00% [1637, 1638, 1639, 1640, 1641, 1642]
🔵 com/starrocks/sql/optimizer/rule/tree/PreAggregateTurnOnRule.java 3 3 100.00% []

Copy link

github-actions bot commented Jan 2, 2025

[BE Incremental Coverage Report]

pass : 2 / 2 (100.00%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 src/exprs/agg/factory/aggregate_factory.hpp 2 2 100.00% []

Copy link

sonarqubecloud bot commented Jan 3, 2025

Copy link

github-actions bot commented Jan 3, 2025

[Java-Extensions Incremental Coverage Report]

pass : 0 / 0 (0%)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Aggregate Function] Support DataSketches in StarRocks
1 participant