-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] optimize append_selectivity performance #54261
base: main
Are you sure you want to change the base?
Conversation
*(dst_offsets + 1) = *(dst_offsets) + str_size; | ||
strings::memcpy_inlined(dst_bytes, src_bytes + src_offsets[next_idx], str_size); | ||
|
||
_bytes.resize(*(dst_offsets + 1)); | ||
} | ||
|
||
_slices_cache = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Memory access out of bounds due to missing prefetch setup for the last element, leading potentially to undefined behavior when accessing src_offsets[next_idx + 1]
without ensuring next_idx
is a valid index.
You can modify the code like this:
+ idx = indexes[from + size - 1]; // Correct placement within the loop after proper bounds check or resizing
T str_size = src_offsets[idx + 1] - src_offsets[idx];
*(dst_offsets + 1) = *(dst_offsets) + str_size;
strings::memcpy_inlined(dst_bytes, src_bytes + src_offsets[idx], str_size);
_bytes.resize(*(dst_offsets + 1));
std::vector<bool> orders = {true}; | ||
std::vector<bool> null_fists = {false}; | ||
_sort_descs = SortDescs(orders, null_fists); | ||
|
||
_unique_metrics->add_info_string("ShuffleNumPerChannel", std::to_string(_num_shuffles_per_channel)); | ||
_unique_metrics->add_info_string("TotalShuffleNum", std::to_string(_num_shuffles)); | ||
_unique_metrics->add_info_string("PipelineLevelShuffle", _is_pipeline_level_shuffle ? "Yes" : "No"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Improper handling of the scenario when chunk
's size exceeds the configuration's chunk_size
, which can lead to sorting and sending a wrong version of chunk
.
You can modify the code like this:
if (_chunks[driver_sequence]->num_rows() + size > state->chunk_size()) {
if (config::enable_shuffle_sort) {
Permutation _sort_permutation;
_sort_permutation.resize(0);
Columns orderby_columns;
for (auto& expr : _parent->_sort_expr_ctxs) {
ASSIGN_OR_RETURN(auto col, expr->evaluate(_chunks[driver_sequence].get()));
orderby_columns.emplace_back(col);
}
RETURN_IF_ERROR(sort_and_tie_columns(state->cancelled_ref(), orderby_columns, _parent->_sort_descs,
&_sort_permutation));
auto sorted_chunk =
_chunks[driver_sequence]->clone_empty_with_slot(_sort_permutation.size());
materialize_by_permutation(sorted_chunk.get(), _chunks[driver_sequence].get(), _sort_permutation);
RETURN_IF_ERROR(send_one_chunk(state, sorted_chunk.get(), driver_sequence, false));
} else {
RETURN_IF_ERROR(send_one_chunk(state, _chunks[driver_sequence].get(), driver_sequence, false));
}
// we only clear column data, because we need to reuse column schema
_chunks[driver_sequence]->set_num_rows(0);
}
Explanation: The call to materialize_by_permutation
should be operating on _chunks[driver_sequence]
instead of chunk
to correctly sort and send the accumulated chunks. This change ensures that the chunks are processed in their entirety, respecting the constraints of chunk size.
} | ||
#endif | ||
// Handle the remaining elements | ||
for (; i < size; ++i) { | ||
_data[orig_size + i] = src_data[indexes[from + i]]; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
There is a potential type mismatch error in the AVX2 part where integer types are cast to int*
for fsrc_data
, potentially causing incorrect memory access or data corruption when T is not int32_t
.
You can modify the code like this:
void FixedLengthColumnBase<T>::append_selective(const Column& src, const uint32_t* indexes, size_t from, size_t size){
const T* src_data = reinterpret_cast<const T*>(src.raw_data());
size_t orig_size = _data.size();
_data.resize(orig_size + size);
size_t i = 0;
#ifdef __AVX2__
if (config::enable_avx_gather) {
T* store = _data.data() + orig_size;
if constexpr (std::is_same_v<T, int32_t> || std::is_same_v<T, uint8_t> || std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t>) {
using IntType = std::conditional_t<std::is_same_v<T, int32_t>, int, int>;
const IntType* fsrc_data = reinterpret_cast<const IntType*>(src_data);
if constexpr (std::is_same_v<T, int32_t>) {
for (; i + 7 < size; i += 8) {
__m256i index_vec = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(indexes + from + i));
__m256i data_vec = _mm256_i32gather_epi32(fsrc_data, index_vec, 4);
_mm256_storeu_si256(reinterpret_cast<__m256i*>(store + i), data_vec);
}
} else {
constexpr int type_size = sizeof(T);
int temp[8] = {0};
for (; i + 7 < size; i += 8) {
__m256i index_vec = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(indexes + from + i));
__m256i data_vec = _mm256_i32gather_epi32(fsrc_data, index_vec, type_size);
_mm256_storeu_si256(reinterpret_cast<__m256i*>(temp), data_vec);
for (int j = 0; j < 8; j++) {
store[i + j] = temp[j];
}
}
}
} else if constexpr (std::is_same_v<T, int64_t>) {
const long long int* fsrc_data = (const long long int*)src_data;
for (; i + 3 < size; i += 4) {
__m128i index_vec = _mm_loadu_si128(reinterpret_cast<const __m128i*>(indexes + from + i));
__m256i data_vec = _mm256_i32gather_epi64(fsrc_data, index_vec, 8);
_mm256_storeu_si256(reinterpret_cast<__m256i*>(store + i), data_vec);
}
}
}
#endif
for (; i < size; ++i) {
_data[orig_size + i] = src_data[indexes[from + i]];
}
}
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
Signed-off-by: Seaven <[email protected]>
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]✅ pass : 7 / 7 (100.00%) file detail
|
[BE Incremental Coverage Report]❌ fail : 32 / 83 (38.55%) file detail
|
Why I'm doing:
What I'm doing:
Fixes #issue
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: