Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] optimize append_selectivity performance #54261

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

Seaven
Copy link
Contributor

@Seaven Seaven commented Dec 24, 2024

Why I'm doing:

What I'm doing:

Fixes #issue

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.4
    • 3.3
    • 3.2
    • 3.1
    • 3.0

*(dst_offsets + 1) = *(dst_offsets) + str_size;
strings::memcpy_inlined(dst_bytes, src_bytes + src_offsets[next_idx], str_size);

_bytes.resize(*(dst_offsets + 1));
}

_slices_cache = false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Memory access out of bounds due to missing prefetch setup for the last element, leading potentially to undefined behavior when accessing src_offsets[next_idx + 1] without ensuring next_idx is a valid index.

You can modify the code like this:

+        idx = indexes[from + size - 1]; // Correct placement within the loop after proper bounds check or resizing
        T str_size = src_offsets[idx + 1] - src_offsets[idx];
        *(dst_offsets + 1) = *(dst_offsets) + str_size;
        strings::memcpy_inlined(dst_bytes, src_bytes + src_offsets[idx], str_size);

        _bytes.resize(*(dst_offsets + 1));

std::vector<bool> orders = {true};
std::vector<bool> null_fists = {false};
_sort_descs = SortDescs(orders, null_fists);

_unique_metrics->add_info_string("ShuffleNumPerChannel", std::to_string(_num_shuffles_per_channel));
_unique_metrics->add_info_string("TotalShuffleNum", std::to_string(_num_shuffles));
_unique_metrics->add_info_string("PipelineLevelShuffle", _is_pipeline_level_shuffle ? "Yes" : "No");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Improper handling of the scenario when chunk's size exceeds the configuration's chunk_size, which can lead to sorting and sending a wrong version of chunk.

You can modify the code like this:

if (_chunks[driver_sequence]->num_rows() + size > state->chunk_size()) {
    if (config::enable_shuffle_sort) {
        Permutation _sort_permutation;
        _sort_permutation.resize(0);
        Columns orderby_columns;
        for (auto& expr : _parent->_sort_expr_ctxs) {
            ASSIGN_OR_RETURN(auto col, expr->evaluate(_chunks[driver_sequence].get()));
            orderby_columns.emplace_back(col);
        }
        RETURN_IF_ERROR(sort_and_tie_columns(state->cancelled_ref(), orderby_columns, _parent->_sort_descs,
                                             &_sort_permutation));
        auto sorted_chunk =
                _chunks[driver_sequence]->clone_empty_with_slot(_sort_permutation.size());
        materialize_by_permutation(sorted_chunk.get(), _chunks[driver_sequence].get(), _sort_permutation);
        RETURN_IF_ERROR(send_one_chunk(state, sorted_chunk.get(), driver_sequence, false));
    } else {
        RETURN_IF_ERROR(send_one_chunk(state, _chunks[driver_sequence].get(), driver_sequence, false));
    }

    // we only clear column data, because we need to reuse column schema
    _chunks[driver_sequence]->set_num_rows(0);
}

Explanation: The call to materialize_by_permutation should be operating on _chunks[driver_sequence] instead of chunk to correctly sort and send the accumulated chunks. This change ensures that the chunks are processed in their entirety, respecting the constraints of chunk size.

}
#endif
// Handle the remaining elements
for (; i < size; ++i) {
_data[orig_size + i] = src_data[indexes[from + i]];
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
There is a potential type mismatch error in the AVX2 part where integer types are cast to int* for fsrc_data, potentially causing incorrect memory access or data corruption when T is not int32_t.

You can modify the code like this:

void FixedLengthColumnBase<T>::append_selective(const Column& src, const uint32_t* indexes, size_t from, size_t size){
    const T* src_data = reinterpret_cast<const T*>(src.raw_data());
    size_t orig_size = _data.size();
    _data.resize(orig_size + size);

    size_t i = 0;
#ifdef __AVX2__
    if (config::enable_avx_gather) {
        T* store = _data.data() + orig_size;
        if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint8_t> || std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t>) {
            using IntType = std::conditional_t<std::is_same_v<T, int32_t>, int, int>;
            const IntType* fsrc_data = reinterpret_cast<const IntType*>(src_data);
            
            if constexpr (std::is_same_v<T, int32_t>) {
                for (; i + 7 < size; i += 8) {
                    __m256i index_vec = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(indexes + from + i));
                    __m256i data_vec = _mm256_i32gather_epi32(fsrc_data, index_vec, 4);
                    _mm256_storeu_si256(reinterpret_cast<__m256i*>(store + i), data_vec);
                }
            } else {
                constexpr int type_size = sizeof(T);
                int temp[8] = {0};
                for (; i + 7 < size; i += 8) {
                    __m256i index_vec = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(indexes + from + i));
                    __m256i data_vec = _mm256_i32gather_epi32(fsrc_data, index_vec, type_size);
                    _mm256_storeu_si256(reinterpret_cast<__m256i*>(temp), data_vec);
                    for (int j = 0; j < 8; j++) {
                        store[i + j] = temp[j];
                    }
                }
            }
        } else if constexpr (std::is_same_v<T, int64_t>) {
            const long long int* fsrc_data = (const long long int*)src_data;
            for (; i + 3 < size; i += 4) {
                __m128i index_vec = _mm_loadu_si128(reinterpret_cast<const __m128i*>(indexes + from + i));
                __m256i data_vec = _mm256_i32gather_epi64(fsrc_data, index_vec, 8);
                _mm256_storeu_si256(reinterpret_cast<__m256i*>(store + i), data_vec);
            }
        } 
    }
#endif
    for (; i < size; ++i) {
        _data[orig_size + i] = src_data[indexes[from + i]];
    }
}

Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
@wanpengfei-git wanpengfei-git requested a review from a team January 2, 2025 07:04
Signed-off-by: Seaven <[email protected]>
Copy link

github-actions bot commented Jan 3, 2025

[Java-Extensions Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

github-actions bot commented Jan 3, 2025

[FE Incremental Coverage Report]

pass : 7 / 7 (100.00%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/sql/plan/PlanFragmentBuilder.java 1 1 100.00% []
🔵 com/starrocks/planner/PlanNode.java 6 6 100.00% []

Copy link

github-actions bot commented Jan 3, 2025

[BE Incremental Coverage Report]

fail : 32 / 83 (38.55%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 src/exec/aggregate/agg_profile.h 0 2 00.00% [38, 39]
🔵 src/exec/aggregate/agg_hash_set.h 0 11 00.00% [105, 106, 108, 109, 111, 112, 113, 114, 182, 283, 390]
🔵 src/exec/pipeline/exchange/exchange_sink_operator.cpp 0 12 00.00% [203, 204, 207, 209, 210, 213, 215, 217, 218, 446, 449, 450]
🔵 src/column/binary_column.cpp 21 40 52.50% [93, 95, 96, 97, 99, 100, 102, 103, 104, 105, 106, 107, 109, 111, 112, 113, 114, 115, 119]
🔵 src/column/fixed_length_column_base.cpp 8 14 57.14% [57, 59, 60, 61, 62, 63]
🔵 src/exec/aggregator.cpp 3 4 75.00% [713]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants