diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index df4b036347ba1f..f0a55a2a8d763c 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -773,6 +773,8 @@ endif() set(WL_START_GROUP "-Wl,--start-group") set(WL_END_GROUP "-Wl,--end-group") +set(WL_LINK_STATIC "-Wl,-Bstatic") +set(WL_LINK_DYNAMIC "-Wl,-Bdynamic") # Set starrocks libraries set(STARROCKS_LINK_LIBS @@ -963,7 +965,8 @@ if (NOT ("${MAKE_TEST}" STREQUAL "ON" AND "${BUILD_FOR_SANITIZE}" STREQUAL "ON") endif() set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} - -lresolv -lbfd -liberty -lc -lm -ldl -rdynamic -pthread -Wl,-wrap=__cxa_throw + ${WL_LINK_STATIC} -lbfd + ${WL_LINK_DYNAMIC} -lresolv -liberty -lc -lm -ldl -rdynamic -pthread -Wl,-wrap=__cxa_throw ) # link gcov if WITH_GCOV is on diff --git a/be/src/column/adaptive_nullable_column.h b/be/src/column/adaptive_nullable_column.h index b247bdce6e18fa..0360f8c696a54d 100644 --- a/be/src/column/adaptive_nullable_column.h +++ b/be/src/column/adaptive_nullable_column.h @@ -276,9 +276,7 @@ class AdaptiveNullableColumn final : public ColumnFactory upgrade_if_overflow() override { materialized_nullable(); - if (_null_column->capacity_limit_reached()) { - return Status::InternalError("Size of NullableColumn exceed the limit"); - } + RETURN_IF_ERROR(_null_column->capacity_limit_reached()); return upgrade_helper_func(&_data_column); } @@ -556,9 +554,9 @@ class AdaptiveNullableColumn final : public ColumnFactory { std::string debug_string() const override; - bool capacity_limit_reached(std::string* msg = nullptr) const override { - return _elements->capacity_limit_reached(msg) || _offsets->capacity_limit_reached(msg); + Status capacity_limit_reached() const override { + RETURN_IF_ERROR(_elements->capacity_limit_reached()); + return _offsets->capacity_limit_reached(); } StatusOr upgrade_if_overflow() override; diff --git a/be/src/column/binary_column.cpp b/be/src/column/binary_column.cpp index e209e548defa01..b4e5bedc48e856 100644 --- a/be/src/column/binary_column.cpp +++ b/be/src/column/binary_column.cpp @@ -24,6 +24,7 @@ #include "gutil/bits.h" #include "gutil/casts.h" #include "gutil/strings/fastmem.h" +#include "gutil/strings/substitute.h" #include "util/hash_util.hpp" #include "util/mysql_row_buffer.h" #include "util/raw_container.h" @@ -738,45 +739,37 @@ bool BinaryColumnBase::has_large_column() const { } template -bool BinaryColumnBase::capacity_limit_reached(std::string* msg) const { +Status BinaryColumnBase::capacity_limit_reached() const { static_assert(std::is_same_v || std::is_same_v); if constexpr (std::is_same_v) { // The size limit of a single element is 2^32 - 1. // The size limit of all elements is 2^32 - 1. // The number limit of elements is 2^32 - 1. if (_bytes.size() >= Column::MAX_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("Total byte size of binary column exceed the limit: " + - std::to_string(Column::MAX_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed( + strings::Substitute("Total byte size of binary column exceed the limit: $0", + std::to_string(Column::MAX_CAPACITY_LIMIT))); } else if (_offsets.size() >= Column::MAX_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("Total row count of binary column exceed the limit: " + - std::to_string(Column::MAX_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed( + strings::Substitute("Total row count of binary column exceed the limit: $0", + std::to_string(Column::MAX_CAPACITY_LIMIT))); } else { - return false; + return Status::OK(); } } else { // The size limit of a single element is 2^32 - 1. // The size limit of all elements is 2^64 - 1. // The number limit of elements is 2^32 - 1. if (_bytes.size() >= Column::MAX_LARGE_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("Total byte size of large binary column exceed the limit: " + - std::to_string(Column::MAX_LARGE_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed( + strings::Substitute("Total byte size of large binary column exceed the limit: $0", + std::to_string(Column::MAX_LARGE_CAPACITY_LIMIT))); } else if (_offsets.size() >= Column::MAX_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("Total row count of large binary column exceed the limit: " + - std::to_string(Column::MAX_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed( + strings::Substitute("Total row count of large binary column exceed the limit: $0", + std::to_string(Column::MAX_CAPACITY_LIMIT))); } else { - return false; + return Status::OK(); } } } diff --git a/be/src/column/binary_column.h b/be/src/column/binary_column.h index 09bfac117a058b..6e0b59b374daed 100644 --- a/be/src/column/binary_column.h +++ b/be/src/column/binary_column.h @@ -339,7 +339,7 @@ class BinaryColumnBase final : public ColumnFactory> return ss.str(); } - bool capacity_limit_reached(std::string* msg = nullptr) const override; + Status capacity_limit_reached() const override; private: void _build_slices() const; diff --git a/be/src/column/chunk.h b/be/src/column/chunk.h index cf382610771115..67de2c8768540c 100644 --- a/be/src/column/chunk.h +++ b/be/src/column/chunk.h @@ -277,13 +277,11 @@ class Chunk { std::string rebuild_csv_row(size_t index, const std::string& delimiter) const; - bool capacity_limit_reached(std::string* msg = nullptr) const { + Status capacity_limit_reached() const { for (const auto& column : _columns) { - if (column->capacity_limit_reached(msg)) { - return true; - } + RETURN_IF_ERROR(column->capacity_limit_reached()); } - return false; + return Status::OK(); } query_cache::owner_info& owner_info() { return _owner_info; } diff --git a/be/src/column/column.h b/be/src/column/column.h index ec730265a0fc8e..3d86550fb09a71 100644 --- a/be/src/column/column.h +++ b/be/src/column/column.h @@ -394,7 +394,7 @@ class Column { // The interface will not free memory!!! virtual void reset_column() { _delete_state = DEL_NOT_SATISFIED; } - virtual bool capacity_limit_reached(std::string* msg = nullptr) const = 0; + virtual Status capacity_limit_reached() const = 0; virtual Status accept(ColumnVisitor* visitor) const = 0; diff --git a/be/src/column/const_column.h b/be/src/column/const_column.h index 6a2b27000b321c..4edd7449a9958b 100644 --- a/be/src/column/const_column.h +++ b/be/src/column/const_column.h @@ -18,6 +18,7 @@ #include "column/datum.h" #include "column/vectorized_fwd.h" #include "common/logging.h" +#include "gutil/strings/substitute.h" namespace starrocks { @@ -251,15 +252,13 @@ class ConstColumn final : public ColumnFactory { return ss.str(); } - bool capacity_limit_reached(std::string* msg = nullptr) const override { - RETURN_IF_UNLIKELY(_data->capacity_limit_reached(msg), true); + Status capacity_limit_reached() const override { + RETURN_IF_ERROR(_data->capacity_limit_reached()); if (_size > Column::MAX_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("Row count of const column reach limit: " + std::to_string(Column::MAX_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed(strings::Substitute("Row count of const column reach limit: $0", + std::to_string(Column::MAX_CAPACITY_LIMIT))); } - return false; + return Status::OK(); } void check_or_die() const override; diff --git a/be/src/column/fixed_length_column_base.cpp b/be/src/column/fixed_length_column_base.cpp index b29ab265f0e828..308feaa4ba2e13 100644 --- a/be/src/column/fixed_length_column_base.cpp +++ b/be/src/column/fixed_length_column_base.cpp @@ -29,11 +29,8 @@ namespace starrocks { template StatusOr FixedLengthColumnBase::upgrade_if_overflow() { - if (capacity_limit_reached()) { - return Status::InternalError("Size of FixedLengthColumn exceed the limit"); - } else { - return nullptr; - } + RETURN_IF_ERROR(capacity_limit_reached()); + return nullptr; } template diff --git a/be/src/column/fixed_length_column_base.h b/be/src/column/fixed_length_column_base.h index e976cce366da36..7ac5a17f5fd672 100644 --- a/be/src/column/fixed_length_column_base.h +++ b/be/src/column/fixed_length_column_base.h @@ -21,6 +21,7 @@ #include "column/datum.h" #include "column/vectorized_fwd.h" #include "common/statusor.h" +#include "gutil/strings/substitute.h" #include "runtime/decimalv2_value.h" #include "types/date_value.hpp" #include "types/timestamp_value.h" @@ -236,15 +237,13 @@ class FixedLengthColumnBase : public ColumnFactory 2^32), but some interface such as update_rows() will use index of uint32_t to // access the item, so we should use 2^32 as the limit - bool capacity_limit_reached(std::string* msg = nullptr) const override { + Status capacity_limit_reached() const override { if (_data.size() > Column::MAX_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("row count of fixed length column exceend the limit: " + - std::to_string(Column::MAX_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed( + strings::Substitute("row count of fixed length column exceend the limit: $0", + std::to_string(Column::MAX_CAPACITY_LIMIT))); } - return false; + return Status::OK(); } void check_or_die() const override {} diff --git a/be/src/column/json_column.cpp b/be/src/column/json_column.cpp index 46bdb2dc88f91c..f8841a1eec71ee 100644 --- a/be/src/column/json_column.cpp +++ b/be/src/column/json_column.cpp @@ -24,6 +24,7 @@ #include "common/compiler_util.h" #include "glog/logging.h" #include "gutil/casts.h" +#include "gutil/strings/substitute.h" #include "simd/simd.h" #include "types/logical_type.h" #include "util/hash_util.hpp" @@ -430,14 +431,12 @@ void JsonColumn::reset_column() { _path_to_index.clear(); } -bool JsonColumn::capacity_limit_reached(std::string* msg) const { +Status JsonColumn::capacity_limit_reached() const { if (size() > Column::MAX_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("row count of object column exceed the limit: " + std::to_string(Column::MAX_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed(strings::Substitute("row count of object column exceed the limit: $0", + std::to_string(Column::MAX_CAPACITY_LIMIT))); } - return false; + return Status::OK(); } void JsonColumn::check_or_die() const { diff --git a/be/src/column/json_column.h b/be/src/column/json_column.h index ca71de64940d16..935bb84333ed8c 100644 --- a/be/src/column/json_column.h +++ b/be/src/column/json_column.h @@ -101,7 +101,7 @@ class JsonColumn final : public ColumnFactory, JsonColum void swap_column(Column& rhs) override; void reset_column() override; - bool capacity_limit_reached(std::string* msg = nullptr) const override; + Status capacity_limit_reached() const override; void check_or_die() const override; // support flat json on storage diff --git a/be/src/column/map_column.h b/be/src/column/map_column.h index 4617b225ff0741..15c3fd9a4e65a9 100644 --- a/be/src/column/map_column.h +++ b/be/src/column/map_column.h @@ -164,9 +164,10 @@ class MapColumn final : public ColumnFactory { std::string debug_string() const override; - bool capacity_limit_reached(std::string* msg = nullptr) const override { - return _keys->capacity_limit_reached(msg) || _values->capacity_limit_reached(msg) || - _offsets->capacity_limit_reached(msg); + Status capacity_limit_reached() const override { + RETURN_IF_ERROR(_keys->capacity_limit_reached()); + RETURN_IF_ERROR(_values->capacity_limit_reached()); + return _offsets->capacity_limit_reached(); } StatusOr upgrade_if_overflow() override; diff --git a/be/src/column/nullable_column.cpp b/be/src/column/nullable_column.cpp index ae601bfe6d5245..0aae77ce4297ea 100644 --- a/be/src/column/nullable_column.cpp +++ b/be/src/column/nullable_column.cpp @@ -430,9 +430,7 @@ void NullableColumn::check_or_die() const { } StatusOr NullableColumn::upgrade_if_overflow() { - if (_null_column->capacity_limit_reached()) { - return Status::InternalError("Size of NullableColumn exceed the limit"); - } + RETURN_IF_ERROR(_null_column->capacity_limit_reached()); return upgrade_helper_func(&_data_column); } diff --git a/be/src/column/nullable_column.h b/be/src/column/nullable_column.h index 3fa7f64dac2fbb..304f57a4d38f04 100644 --- a/be/src/column/nullable_column.h +++ b/be/src/column/nullable_column.h @@ -311,8 +311,9 @@ class NullableColumn : public ColumnFactory { return ss.str(); } - bool capacity_limit_reached(std::string* msg = nullptr) const override { - return _data_column->capacity_limit_reached(msg) || _null_column->capacity_limit_reached(msg); + Status capacity_limit_reached() const override { + RETURN_IF_ERROR(_data_column->capacity_limit_reached()); + return _null_column->capacity_limit_reached(); } void check_or_die() const override; diff --git a/be/src/column/object_column.cpp b/be/src/column/object_column.cpp index da13ca585a30ec..3e3b061d1666b7 100644 --- a/be/src/column/object_column.cpp +++ b/be/src/column/object_column.cpp @@ -328,9 +328,7 @@ std::string ObjectColumn::debug_item(size_t idx) const { template StatusOr ObjectColumn::upgrade_if_overflow() { - if (capacity_limit_reached()) { - return Status::InternalError("Size of ObjectColumn exceed the limit"); - } + RETURN_IF_ERROR(capacity_limit_reached()); return nullptr; } diff --git a/be/src/column/object_column.h b/be/src/column/object_column.h index 25ec0e851ecc73..482b68ec7f1a02 100644 --- a/be/src/column/object_column.h +++ b/be/src/column/object_column.h @@ -20,6 +20,7 @@ #include "column/datum.h" #include "column/vectorized_fwd.h" #include "common/object_pool.h" +#include "gutil/strings/substitute.h" #include "types/bitmap_value.h" #include "types/hll.h" #include "util/json.h" @@ -213,15 +214,12 @@ class ObjectColumn : public ColumnFactory> { return ss.str(); } - bool capacity_limit_reached(std::string* msg = nullptr) const override { + Status capacity_limit_reached() const override { if (_pool.size() > Column::MAX_CAPACITY_LIMIT) { - if (msg != nullptr) { - msg->append("row count of object column exceed the limit: " + - std::to_string(Column::MAX_CAPACITY_LIMIT)); - } - return true; + return Status::CapacityLimitExceed(strings::Substitute("row count of object column exceed the limit: $0", + std::to_string(Column::MAX_CAPACITY_LIMIT))); } - return false; + return Status::OK(); } StatusOr upgrade_if_overflow() override; @@ -230,7 +228,7 @@ class ObjectColumn : public ColumnFactory> { bool has_large_column() const override { return false; } - void check_or_die() const {} + void check_or_die() const override {} private: // add this to avoid warning clang-diagnostic-overloaded-virtual diff --git a/be/src/column/struct_column.cpp b/be/src/column/struct_column.cpp index a921e0ed147368..a1482d5ef2cf86 100644 --- a/be/src/column/struct_column.cpp +++ b/be/src/column/struct_column.cpp @@ -447,12 +447,11 @@ void StructColumn::swap_column(Column& rhs) { // _field_names dont need swap } -bool StructColumn::capacity_limit_reached(std::string* msg) const { - bool res = false; +Status StructColumn::capacity_limit_reached() const { for (const auto& column : _fields) { - res = res || column->capacity_limit_reached(msg); + RETURN_IF_ERROR(column->capacity_limit_reached()); } - return res; + return Status::OK(); } void StructColumn::check_or_die() const { diff --git a/be/src/column/struct_column.h b/be/src/column/struct_column.h index 77532683cc7a44..1bf73ceba349bc 100644 --- a/be/src/column/struct_column.h +++ b/be/src/column/struct_column.h @@ -171,7 +171,7 @@ class StructColumn final : public ColumnFactory { void reset_column() override; - bool capacity_limit_reached(std::string* msg = nullptr) const override; + Status capacity_limit_reached() const override; void check_or_die() const override; diff --git a/be/src/common/config.h b/be/src/common/config.h index 308873f7127be1..0dce0fee0901fd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1251,6 +1251,8 @@ CONF_String(rocksdb_db_options_string, "create_if_missing=true;create_missing_co CONF_Int64(local_exchange_buffer_mem_limit_per_driver, "134217728"); // 128MB // only used for test. default: 128M CONF_mInt64(streaming_agg_limited_memory_size, "134217728"); +// mem limit for partition hash join probe side buffer +CONF_mInt64(partition_hash_join_probe_limit_size, "134217728"); // pipeline streaming aggregate chunk buffer size CONF_mInt32(streaming_agg_chunk_buffer_size, "1024"); CONF_mInt64(wait_apply_time, "6000"); // 6s @@ -1297,7 +1299,7 @@ CONF_mInt32(finish_publish_version_internal, "100"); CONF_mBool(enable_stream_load_verbose_log, "false"); -CONF_mInt32(get_txn_status_internal_sec, "30"); +CONF_mInt32(get_txn_status_internal_sec, "10"); CONF_mBool(dump_metrics_with_bvar, "true"); @@ -1375,10 +1377,6 @@ CONF_mDouble(connector_sink_mem_urgent_space_ratio, "0.1"); // .crm file can be removed after 1day. CONF_mInt32(unused_crm_file_threshold_second, "86400" /** 1day **/); -// When the keys that we want to delete, number of them is larger than this config, -// we will fallback and using `DeleteRange` in rocksdb. -CONF_mInt32(rocksdb_opt_delete_range_limit, "500"); - // python envs config // create time worker timeout CONF_mInt32(create_child_worker_timeout_ms, "1000"); diff --git a/be/src/exec/analytor.cpp b/be/src/exec/analytor.cpp index b4ee5c87d60c01..95a273d8250222 100644 --- a/be/src/exec/analytor.cpp +++ b/be/src/exec/analytor.cpp @@ -586,13 +586,6 @@ Status Analytor::_add_chunk(const ChunkPtr& chunk) { const size_t chunk_size = chunk->num_rows(); { - auto check_if_overflow = [](Column* column) { - std::string msg; - if (column->capacity_limit_reached(&msg)) { - return Status::InternalError(msg); - } - return Status::OK(); - }; SCOPED_TIMER(_column_resize_timer); for (size_t i = 0; i < _agg_fn_ctxs.size(); i++) { for (size_t j = 0; j < _agg_expr_ctxs[i].size(); j++) { @@ -601,20 +594,20 @@ Status Analytor::_add_chunk(const ChunkPtr& chunk) { // When chunk's column is const, maybe need to unpack it. TRY_CATCH_BAD_ALLOC(_append_column(chunk_size, _agg_intput_columns[i][j].get(), column)); - RETURN_IF_ERROR(check_if_overflow(_agg_intput_columns[i][j].get())); + RETURN_IF_ERROR(_agg_intput_columns[i][j]->capacity_limit_reached()); } } for (size_t i = 0; i < _partition_ctxs.size(); i++) { ASSIGN_OR_RETURN(ColumnPtr column, _partition_ctxs[i]->evaluate(chunk.get())); TRY_CATCH_BAD_ALLOC(_append_column(chunk_size, _partition_columns[i].get(), column)); - RETURN_IF_ERROR(check_if_overflow(_partition_columns[i].get())); + RETURN_IF_ERROR(_partition_columns[i]->capacity_limit_reached()); } for (size_t i = 0; i < _order_ctxs.size(); i++) { ASSIGN_OR_RETURN(ColumnPtr column, _order_ctxs[i]->evaluate(chunk.get())); TRY_CATCH_BAD_ALLOC(_append_column(chunk_size, _order_columns[i].get(), column)); - RETURN_IF_ERROR(check_if_overflow(_order_columns[i].get())); + RETURN_IF_ERROR(_order_columns[i]->capacity_limit_reached()); } } diff --git a/be/src/exec/hash_join_components.cpp b/be/src/exec/hash_join_components.cpp index b7755416f7f895..86991a632c1058 100644 --- a/be/src/exec/hash_join_components.cpp +++ b/be/src/exec/hash_join_components.cpp @@ -14,12 +14,23 @@ #include "exec/hash_join_components.h" +#include #include +#include #include "column/vectorized_fwd.h" +#include "common/config.h" +#include "common/logging.h" +#include "common/object_pool.h" #include "exec/hash_joiner.h" #include "exec/join_hash_map.h" +#include "exprs/agg/distinct.h" +#include "exprs/expr_context.h" #include "gutil/casts.h" +#include "runtime/descriptors.h" +#include "runtime/mem_tracker.h" +#include "util/cpu_info.h" +#include "util/runtime_profile.h" namespace starrocks { @@ -28,12 +39,16 @@ class SingleHashJoinProberImpl final : public HashJoinProberImpl { SingleHashJoinProberImpl(HashJoiner& hash_joiner) : HashJoinProberImpl(hash_joiner) {} ~SingleHashJoinProberImpl() override = default; bool probe_chunk_empty() const override { return _probe_chunk == nullptr; } + Status on_input_finished(RuntimeState* state) override { return Status::OK(); } Status push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk) override; StatusOr probe_chunk(RuntimeState* state) override; StatusOr probe_remain(RuntimeState* state, bool* has_remain) override; - void reset() override { + void reset(RuntimeState* runtime_state) override { _probe_chunk.reset(); _current_probe_has_remain = false; + if (_hash_table != nullptr) { + _hash_table->reset_probe_state(runtime_state); + } } void set_ht(JoinHashTable* hash_table) { _hash_table = hash_table; } @@ -85,6 +100,241 @@ void HashJoinProber::attach(HashJoinBuilder* builder, const HashJoinProbeMetrics _impl = builder->create_prober(); } +class PartitionChunkChannel { +public: + PartitionChunkChannel(MemTracker* tracker) : _tracker(tracker) {} + bool processing() const { return _processing; } + void set_processing(bool processing) { _processing = processing; } + + ChunkPtr pull() { + auto chunk = std::move(_chunks.front()); + _tracker->release(chunk->memory_usage()); + _chunks.pop_front(); + return chunk; + } + + void push(ChunkPtr&& chunk) { + _tracker->consume(chunk->memory_usage()); + _chunks.emplace_back(std::move(chunk)); + } + + const ChunkPtr& back() { return _chunks.back(); } + + bool is_full() const { + return _chunks.size() >= 4 || _tracker->consumption() > config::partition_hash_join_probe_limit_size; + } + + size_t size() const { return _chunks.size(); } + + bool is_empty() const { return _chunks.empty() || _chunks.front()->is_empty(); } + + bool not_empty() const { return !is_empty(); } + +private: + MemTracker* _tracker; + std::deque _chunks; + bool _processing = false; +}; + +class PartitionedHashJoinProberImpl final : public HashJoinProberImpl { +public: + PartitionedHashJoinProberImpl(HashJoiner& hash_joiner) : HashJoinProberImpl(hash_joiner) {} + ~PartitionedHashJoinProberImpl() override = default; + bool probe_chunk_empty() const override; + Status on_input_finished(RuntimeState* state) override; + Status push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk) override; + StatusOr probe_chunk(RuntimeState* state) override; + StatusOr probe_remain(RuntimeState* state, bool* has_remain) override; + void reset(RuntimeState* runtime_state) override; + void set_probers(std::vector>&& probers) { + _probers = std::move(probers); + _partition_input_channels.resize(_probers.size(), PartitionChunkChannel(&_mem_tracker)); + } + +private: + MemTracker _mem_tracker; + bool _all_input_finished = false; + int32_t _remain_partition_idx = 0; + std::vector> _probers; + std::vector _partition_input_channels; +}; + +bool PartitionedHashJoinProberImpl::probe_chunk_empty() const { + auto& probers = _probers; + size_t num_partitions = probers.size(); + + if (!_all_input_finished) { + for (size_t i = 0; i < num_partitions; ++i) { + if (!probers[i]->probe_chunk_empty() || _partition_input_channels[i].processing()) { + return false; + } + } + } else { + for (size_t i = 0; i < num_partitions; ++i) { + if (!probers[i]->probe_chunk_empty() || _partition_input_channels[i].not_empty()) { + return false; + } + } + } + + return true; +} + +Status PartitionedHashJoinProberImpl::on_input_finished(RuntimeState* runtime_state) { + SCOPED_TIMER(_hash_joiner.probe_metrics().partition_probe_overhead); + _all_input_finished = true; + auto& probers = _probers; + size_t num_partitions = probers.size(); + + for (size_t i = 0; i < num_partitions; ++i) { + if (_partition_input_channels[i].is_empty()) { + continue; + } + if (!probers[i]->probe_chunk_empty()) { + continue; + } + RETURN_IF_ERROR(probers[i]->push_probe_chunk(runtime_state, _partition_input_channels[i].pull())); + } + return Status::OK(); +} + +Status PartitionedHashJoinProberImpl::push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk) { + SCOPED_TIMER(_hash_joiner.probe_metrics().partition_probe_overhead); + auto& probers = _probers; + auto& partition_keys = _hash_joiner.probe_expr_ctxs(); + + size_t num_rows = chunk->num_rows(); + size_t num_partitions = probers.size(); + size_t num_partition_cols = partition_keys.size(); + + std::vector partition_columns(num_partition_cols); + for (size_t i = 0; i < num_partition_cols; ++i) { + ASSIGN_OR_RETURN(partition_columns[i], partition_keys[i]->evaluate(chunk.get())); + } + std::vector hash_values; + { + hash_values.assign(num_rows, HashUtil::FNV_SEED); + + for (const ColumnPtr& column : partition_columns) { + column->fnv_hash(hash_values.data(), 0, num_rows); + } + // find partition id + for (size_t i = 0; i < hash_values.size(); ++i) { + hash_values[i] = HashUtil::fmix32(hash_values[i]) & (num_partitions - 1); + } + } + + const auto& partitions = hash_values; + + std::vector selection; + selection.resize(chunk->num_rows()); + + std::vector channel_row_idx_start_points; + channel_row_idx_start_points.assign(num_partitions + 1, 0); + + for (uint32_t i : partitions) { + channel_row_idx_start_points[i]++; + } + + for (int32_t i = 1; i <= channel_row_idx_start_points.size() - 1; ++i) { + channel_row_idx_start_points[i] += channel_row_idx_start_points[i - 1]; + } + + for (int32_t i = chunk->num_rows() - 1; i >= 0; --i) { + selection[channel_row_idx_start_points[partitions[i]] - 1] = i; + channel_row_idx_start_points[partitions[i]]--; + } + _partition_input_channels.resize(num_partitions, PartitionChunkChannel(&_mem_tracker)); + + for (size_t i = 0; i < num_partitions; ++i) { + auto from = channel_row_idx_start_points[i]; + auto size = channel_row_idx_start_points[i + 1] - from; + if (size == 0) { + continue; + } + + if (_partition_input_channels[i].is_empty()) { + _partition_input_channels[i].push(chunk->clone_empty()); + } + + if (_partition_input_channels[i].back()->num_rows() + size <= 4096) { + _partition_input_channels[i].back()->append_selective(*chunk, selection.data(), from, size); + } else { + _partition_input_channels[i].push(chunk->clone_empty()); + _partition_input_channels[i].back()->append_selective(*chunk, selection.data(), from, size); + } + + if (_partition_input_channels[i].is_full()) { + _partition_input_channels[i].set_processing(true); + RETURN_IF_ERROR(probers[i]->push_probe_chunk(state, _partition_input_channels[i].pull())); + } + } + + return Status::OK(); +} + +StatusOr PartitionedHashJoinProberImpl::probe_chunk(RuntimeState* state) { + auto& probers = _probers; + size_t num_partitions = probers.size(); + if (_all_input_finished) { + for (size_t i = 0; i < num_partitions; ++i) { + if (probers[i]->probe_chunk_empty() && _partition_input_channels[i].is_empty()) { + continue; + } + if (probers[i]->probe_chunk_empty()) { + RETURN_IF_ERROR(probers[i]->push_probe_chunk(state, _partition_input_channels[i].pull())); + } + auto chunk = std::make_shared(); + ASSIGN_OR_RETURN(chunk, probers[i]->probe_chunk(state)) + return chunk; + } + } else { + for (size_t i = 0; i < num_partitions; ++i) { + if (probers[i]->probe_chunk_empty() && !_partition_input_channels[i].processing()) { + continue; + } + if (probers[i]->probe_chunk_empty()) { + RETURN_IF_ERROR(probers[i]->push_probe_chunk(state, _partition_input_channels[i].pull())); + } + _partition_input_channels[i].set_processing(_partition_input_channels[i].size() > 1); + auto chunk = std::make_shared(); + ASSIGN_OR_RETURN(chunk, probers[i]->probe_chunk(state)) + return chunk; + } + } + CHECK(false); + + return nullptr; +} + +StatusOr PartitionedHashJoinProberImpl::probe_remain(RuntimeState* state, bool* has_remain) { + auto& probers = _probers; + size_t num_partitions = probers.size(); + while (_remain_partition_idx < num_partitions) { + auto chunk = std::make_shared(); + bool sub_map_has_remain = false; + ASSIGN_OR_RETURN(chunk, probers[_remain_partition_idx]->probe_remain(state, &sub_map_has_remain)); + if (!sub_map_has_remain) { + _remain_partition_idx++; + } + if (chunk->is_empty()) { + continue; + } + *has_remain = true; + return chunk; + } + + *has_remain = false; + return nullptr; +} + +void PartitionedHashJoinProberImpl::reset(RuntimeState* runtime_state) { + _probers.clear(); + _partition_input_channels.clear(); + _all_input_finished = false; + _remain_partition_idx = 0; +} + void SingleHashJoinBuilder::create(const HashTableParam& param) { _ht.create(param); } @@ -99,11 +349,6 @@ void SingleHashJoinBuilder::reset(const HashTableParam& param) { create(param); } -void SingleHashJoinBuilder::reset_probe(RuntimeState* state) { - _key_columns.clear(); - _ht.reset_probe_state(state); -} - bool SingleHashJoinBuilder::anti_join_key_column_has_null() const { if (_ht.get_key_columns().size() != 1) { return false; @@ -155,4 +400,388 @@ ChunkPtr SingleHashJoinBuilder::convert_to_spill_schema(const ChunkPtr& chunk) c return _ht.convert_to_spill_schema(chunk); } +enum class CacheLevel { L2, L3, MEMORY }; + +class AdaptivePartitionHashJoinBuilder final : public HashJoinBuilder { +public: + AdaptivePartitionHashJoinBuilder(HashJoiner& hash_joiner); + ~AdaptivePartitionHashJoinBuilder() override = default; + + void create(const HashTableParam& param) override; + + void close() override; + + void reset(const HashTableParam& param) override; + + Status do_append_chunk(const ChunkPtr& chunk) override; + + Status build(RuntimeState* state) override; + + bool anti_join_key_column_has_null() const override; + + int64_t ht_mem_usage() const override; + + void get_build_info(size_t* bucket_size, float* avg_keys_per_bucket) override; + + size_t get_output_probe_column_count() const override; + size_t get_output_build_column_count() const override; + + void visitHt(const std::function& visitor) override; + + std::unique_ptr create_prober() override; + + void clone_readable(HashJoinBuilder* builder) override; + + ChunkPtr convert_to_spill_schema(const ChunkPtr& chunk) const override; + +private: + size_t _estimated_row_size(const HashTableParam& param) const; + size_t _estimated_probe_cost(const HashTableParam& param) const; + template + size_t _estimated_build_cost(size_t build_row_size) const; + void _adjust_partition_rows(size_t build_row_size); + + void _init_partition_nums(const HashTableParam& param); + Status _convert_to_single_partition(); + Status _append_chunk_to_partitions(const ChunkPtr& chunk); + +private: + std::vector> _builders; + + size_t _partition_num = 0; + size_t _partition_join_min_rows = 0; + size_t _partition_join_max_rows = 0; + + size_t _probe_estimated_costs = 0; + + size_t _fit_L2_cache_max_rows = 0; + size_t _fit_L3_cache_max_rows = 0; + + size_t _L2_cache_size = 0; + size_t _L3_cache_size = 0; + + size_t _pushed_chunks = 0; +}; + +AdaptivePartitionHashJoinBuilder::AdaptivePartitionHashJoinBuilder(HashJoiner& hash_joiner) + : HashJoinBuilder(hash_joiner) { + static constexpr size_t DEFAULT_L2_CACHE_SIZE = 1 * 1024 * 1024; + static constexpr size_t DEFAULT_L3_CACHE_SIZE = 32 * 1024 * 1024; + const auto& cache_sizes = CpuInfo::get_cache_sizes(); + _L2_cache_size = cache_sizes[CpuInfo::L2_CACHE]; + _L3_cache_size = cache_sizes[CpuInfo::L3_CACHE]; + _L2_cache_size = _L2_cache_size ? _L2_cache_size : DEFAULT_L2_CACHE_SIZE; + _L3_cache_size = _L3_cache_size ? _L3_cache_size : DEFAULT_L3_CACHE_SIZE; +} + +size_t AdaptivePartitionHashJoinBuilder::_estimated_row_size(const HashTableParam& param) const { + size_t estimated_each_row = 0; + + for (auto* tuple : param.build_row_desc->tuple_descriptors()) { + for (auto slot : tuple->slots()) { + if (param.build_output_slots.contains(slot->id())) { + estimated_each_row += get_size_of_fixed_length_type(slot->type().type); + estimated_each_row += type_estimated_overhead_bytes(slot->type().type); + } + } + } + + // for hash table bucket + estimated_each_row += 4; + + return estimated_each_row; +} + +// We could use a better estimation model. +size_t AdaptivePartitionHashJoinBuilder::_estimated_probe_cost(const HashTableParam& param) const { + size_t size = 0; + + for (auto* tuple : param.probe_row_desc->tuple_descriptors()) { + for (auto slot : tuple->slots()) { + if (param.probe_output_slots.contains(slot->id())) { + size += get_size_of_fixed_length_type(slot->type().type); + size += type_estimated_overhead_bytes(slot->type().type); + } + } + } + // we define probe cost is bytes size * 6 + return size * 6; +} + +template <> +size_t AdaptivePartitionHashJoinBuilder::_estimated_build_cost(size_t build_row_size) const { + return build_row_size / 2; +} + +template <> +size_t AdaptivePartitionHashJoinBuilder::_estimated_build_cost(size_t build_row_size) const { + return build_row_size; +} + +template <> +size_t AdaptivePartitionHashJoinBuilder::_estimated_build_cost(size_t build_row_size) const { + return build_row_size * 2; +} + +void AdaptivePartitionHashJoinBuilder::_adjust_partition_rows(size_t build_row_size) { + build_row_size = std::max(build_row_size, 4UL); + _fit_L2_cache_max_rows = _L2_cache_size / build_row_size; + _fit_L3_cache_max_rows = _L3_cache_size / build_row_size; + + // If the hash table is smaller than the L2 cache. we don't think partition hash join is needed. + _partition_join_min_rows = _fit_L2_cache_max_rows; + // If the hash table after partition can't be loaded to L3. we don't think partition hash join is needed. + _partition_join_max_rows = _fit_L3_cache_max_rows * _partition_num; + + if (_probe_estimated_costs + _estimated_build_cost(build_row_size) < + _estimated_build_cost(build_row_size)) { + // overhead after hash table partitioning + probe extra cost < cost before partitioning + // nothing to do + } else if (_probe_estimated_costs + _estimated_build_cost(build_row_size) < + _estimated_build_cost(build_row_size)) { + // It is only after this that performance gains can be realized beyond the L3 cache. + _partition_join_min_rows = _fit_L3_cache_max_rows; + } else { + // Partitioned joins don't have performance gains. Not using partition hash join. + _partition_num = 1; + } + + VLOG_OPERATOR << "TRACE:" + << "partition_num=" << _partition_num << " partition_join_min_rows=" << _partition_join_min_rows + << " partition_join_max_rows=" << _partition_join_max_rows << " probe cost=" << _probe_estimated_costs + << " build cost L2=" << _estimated_build_cost(build_row_size) + << " build cost L3=" << _estimated_build_cost(build_row_size) + << " build cost Mem=" << _estimated_build_cost(build_row_size); +} + +void AdaptivePartitionHashJoinBuilder::_init_partition_nums(const HashTableParam& param) { + _partition_num = 16; + + size_t estimated_bytes_each_row = _estimated_row_size(param); + + _probe_estimated_costs = _estimated_probe_cost(param); + + _adjust_partition_rows(estimated_bytes_each_row); + + COUNTER_SET(_hash_joiner.build_metrics().partition_nums, (int64_t)_partition_num); +} + +void AdaptivePartitionHashJoinBuilder::create(const HashTableParam& param) { + _init_partition_nums(param); + for (size_t i = 0; i < _partition_num; ++i) { + _builders.emplace_back(std::make_unique(_hash_joiner)); + _builders.back()->create(param); + } +} + +void AdaptivePartitionHashJoinBuilder::close() { + for (const auto& builder : _builders) { + builder->close(); + } + _builders.clear(); + _partition_num = 0; + _partition_join_min_rows = 0; + _partition_join_max_rows = 0; + _probe_estimated_costs = 0; + _fit_L2_cache_max_rows = 0; + _fit_L3_cache_max_rows = 0; + _pushed_chunks = 0; +} + +void AdaptivePartitionHashJoinBuilder::reset(const HashTableParam& param) { + close(); + create(param); +} + +bool AdaptivePartitionHashJoinBuilder::anti_join_key_column_has_null() const { + return std::any_of(_builders.begin(), _builders.end(), + [](const auto& builder) { return builder->anti_join_key_column_has_null(); }); +} + +void AdaptivePartitionHashJoinBuilder::get_build_info(size_t* bucket_size, float* avg_keys_per_bucket) { + size_t total_bucket_size = 0; + float total_keys_per_bucket = 0; + for (const auto& builder : _builders) { + size_t bucket_size = 0; + float keys_per_bucket = 0; + builder->get_build_info(&bucket_size, &keys_per_bucket); + total_bucket_size += bucket_size; + total_keys_per_bucket += keys_per_bucket; + } + *bucket_size = total_bucket_size; + *avg_keys_per_bucket = total_keys_per_bucket / _builders.size(); +} + +size_t AdaptivePartitionHashJoinBuilder::get_output_probe_column_count() const { + return _builders[0]->get_output_probe_column_count(); +} + +size_t AdaptivePartitionHashJoinBuilder::get_output_build_column_count() const { + return _builders[0]->get_output_build_column_count(); +} + +int64_t AdaptivePartitionHashJoinBuilder::ht_mem_usage() const { + return std::accumulate(_builders.begin(), _builders.end(), 0L, + [](int64_t sum, const auto& builder) { return sum + builder->ht_mem_usage(); }); +} + +Status AdaptivePartitionHashJoinBuilder::_convert_to_single_partition() { + // merge all partition data to the first partition + for (size_t i = 1; i < _partition_num; ++i) { + _builders[0]->hash_table().merge_ht(_builders[i]->hash_table()); + } + _builders.resize(1); + _partition_num = 1; + return Status::OK(); +} + +Status AdaptivePartitionHashJoinBuilder::_append_chunk_to_partitions(const ChunkPtr& chunk) { + const std::vector& build_partition_keys = _hash_joiner.build_expr_ctxs(); + + size_t num_rows = chunk->num_rows(); + size_t num_partitions = _builders.size(); + size_t num_partition_cols = build_partition_keys.size(); + + std::vector partition_columns(num_partition_cols); + for (size_t i = 0; i < num_partition_cols; ++i) { + ASSIGN_OR_RETURN(partition_columns[i], build_partition_keys[i]->evaluate(chunk.get())); + } + std::vector hash_values; + { + hash_values.assign(num_rows, HashUtil::FNV_SEED); + + for (const ColumnPtr& column : partition_columns) { + column->fnv_hash(hash_values.data(), 0, num_rows); + } + // find partition id + for (size_t i = 0; i < hash_values.size(); ++i) { + hash_values[i] = HashUtil::fmix32(hash_values[i]) & (num_partitions - 1); + } + } + + const auto& partitions = hash_values; + + std::vector selection; + selection.resize(chunk->num_rows()); + + std::vector channel_row_idx_start_points; + channel_row_idx_start_points.assign(num_partitions + 1, 0); + + for (uint32_t i : partitions) { + channel_row_idx_start_points[i]++; + } + + for (int32_t i = 1; i <= channel_row_idx_start_points.size() - 1; ++i) { + channel_row_idx_start_points[i] += channel_row_idx_start_points[i - 1]; + } + + for (int32_t i = chunk->num_rows() - 1; i >= 0; --i) { + selection[channel_row_idx_start_points[partitions[i]] - 1] = i; + channel_row_idx_start_points[partitions[i]]--; + } + + for (size_t i = 0; i < num_partitions; ++i) { + auto from = channel_row_idx_start_points[i]; + auto size = channel_row_idx_start_points[i + 1] - from; + if (size == 0) { + continue; + } + // TODO: make builder implements append with selective + auto partition_chunk = chunk->clone_empty(); + partition_chunk->append_selective(*chunk, selection.data(), from, size); + RETURN_IF_ERROR(_builders[i]->append_chunk(std::move(partition_chunk))); + } + return Status::OK(); +} + +Status AdaptivePartitionHashJoinBuilder::do_append_chunk(const ChunkPtr& chunk) { + if (_partition_num > 1 && hash_table_row_count() > _partition_join_max_rows) { + RETURN_IF_ERROR(_convert_to_single_partition()); + } + + if (_partition_num > 1 && ++_pushed_chunks % 8 == 0) { + size_t build_row_size = ht_mem_usage() / hash_table_row_count(); + _adjust_partition_rows(build_row_size); + if (_partition_num == 1) { + RETURN_IF_ERROR(_convert_to_single_partition()); + } + } + + if (_partition_num > 1) { + RETURN_IF_ERROR(_append_chunk_to_partitions(chunk)); + } else { + RETURN_IF_ERROR(_builders[0]->do_append_chunk(chunk)); + } + + return Status::OK(); +} + +ChunkPtr AdaptivePartitionHashJoinBuilder::convert_to_spill_schema(const ChunkPtr& chunk) const { + return _builders[0]->convert_to_spill_schema(chunk); +} + +Status AdaptivePartitionHashJoinBuilder::build(RuntimeState* state) { + DCHECK_EQ(_partition_num, _builders.size()); + + if (_partition_num > 1 && hash_table_row_count() < _partition_join_min_rows) { + RETURN_IF_ERROR(_convert_to_single_partition()); + } + + for (auto& builder : _builders) { + RETURN_IF_ERROR(builder->build(state)); + } + _ready = true; + return Status::OK(); +} + +void AdaptivePartitionHashJoinBuilder::visitHt(const std::function& visitor) { + for (auto& builder : _builders) { + builder->visitHt(visitor); + } +} + +std::unique_ptr AdaptivePartitionHashJoinBuilder::create_prober() { + DCHECK_EQ(_partition_num, _builders.size()); + + if (_partition_num == 1) { + return _builders[0]->create_prober(); + } else { + std::vector> sub_probers; + auto prober = std::make_unique(_hash_joiner); + sub_probers.resize(_partition_num); + for (size_t i = 0; i < _builders.size(); ++i) { + sub_probers[i].reset(down_cast(_builders[i]->create_prober().release())); + } + prober->set_probers(std::move(sub_probers)); + return prober; + } +} + +void AdaptivePartitionHashJoinBuilder::clone_readable(HashJoinBuilder* builder) { + for (auto& builder : _builders) { + DCHECK(builder->ready()); + } + DCHECK(_ready); + DCHECK_EQ(_partition_num, _builders.size()); + auto other = down_cast(builder); + other->_builders.clear(); + other->_partition_num = _partition_num; + other->_partition_join_max_rows = _partition_join_max_rows; + other->_partition_join_min_rows = _partition_join_min_rows; + other->_ready = _ready; + for (size_t i = 0; i < _partition_num; ++i) { + other->_builders.emplace_back(std::make_unique(_hash_joiner)); + _builders[i]->clone_readable(other->_builders[i].get()); + } +} + +HashJoinBuilder* HashJoinBuilderFactory::create(ObjectPool* pool, const HashJoinBuildOptions& options, + HashJoiner& hash_joiner) { + if (options.enable_partitioned_hash_join) { + return pool->add(new AdaptivePartitionHashJoinBuilder(hash_joiner)); + } else { + return pool->add(new SingleHashJoinBuilder(hash_joiner)); + } +} + } // namespace starrocks diff --git a/be/src/exec/hash_join_components.h b/be/src/exec/hash_join_components.h index 3d1a5b08c2a8d7..a582fb14e7bdb9 100644 --- a/be/src/exec/hash_join_components.h +++ b/be/src/exec/hash_join_components.h @@ -31,10 +31,11 @@ class HashJoinProberImpl { public: virtual ~HashJoinProberImpl() = default; virtual bool probe_chunk_empty() const = 0; + virtual Status on_input_finished(RuntimeState* state) = 0; virtual Status push_probe_chunk(RuntimeState* state, ChunkPtr&& chunk) = 0; virtual StatusOr probe_chunk(RuntimeState* state) = 0; virtual StatusOr probe_remain(RuntimeState* state, bool* has_remain) = 0; - virtual void reset() = 0; + virtual void reset(RuntimeState* runtime_state) = 0; protected: HashJoinProberImpl(HashJoiner& hash_joiner) : _hash_joiner(hash_joiner) {} @@ -51,6 +52,13 @@ class HashJoinProber { return _impl->push_probe_chunk(state, std::move(chunk)); } + Status on_input_finished(RuntimeState* state) { + if (_impl == nullptr) { + return Status::OK(); + } + return _impl->on_input_finished(state); + } + // probe hash table StatusOr probe_chunk(RuntimeState* state) { return _impl->probe_chunk(state); } @@ -58,7 +66,7 @@ class HashJoinProber { return _impl->probe_remain(state, has_remain); } - void reset() { return _impl->reset(); } + void reset(RuntimeState* runtime_state) { return _impl->reset(runtime_state); } HashJoinProber* clone_empty(ObjectPool* pool) { return pool->add(new HashJoinProber(_hash_joiner)); } @@ -96,10 +104,6 @@ class HashJoinBuilder { virtual void reset(const HashTableParam& param) = 0; - virtual void reset_probe(RuntimeState* state) = 0; - - virtual HashJoinBuilder* clone_empty(ObjectPool* pool) = 0; - virtual int64_t ht_mem_usage() const = 0; // used for check NULL_AWARE_LEFT_ANTI_JOIN build side has null @@ -118,6 +122,7 @@ class HashJoinBuilder { virtual std::unique_ptr create_prober() = 0; + // clone readable to to builder virtual void clone_readable(HashJoinBuilder* builder) = 0; virtual ChunkPtr convert_to_spill_schema(const ChunkPtr& chunk) const = 0; @@ -148,12 +153,6 @@ class SingleHashJoinBuilder final : public HashJoinBuilder { Status build(RuntimeState* state) override; - void reset_probe(RuntimeState* state) override; - - SingleHashJoinBuilder* clone_empty(ObjectPool* pool) override { - return pool->add(new SingleHashJoinBuilder(_hash_joiner)); - } - bool anti_join_key_column_has_null() const override; int64_t ht_mem_usage() const override { return _ht.mem_usage(); } @@ -179,4 +178,12 @@ class SingleHashJoinBuilder final : public HashJoinBuilder { Columns _key_columns; }; +struct HashJoinBuildOptions { + bool enable_partitioned_hash_join = false; +}; + +class HashJoinBuilderFactory { +public: + static HashJoinBuilder* create(ObjectPool* pool, const HashJoinBuildOptions& options, HashJoiner& hash_joiner); +}; } // namespace starrocks \ No newline at end of file diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 10689bbccd9b25..fd1712e3cf51b8 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -145,6 +145,9 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { if (tnode.hash_join_node.__isset.late_materialization) { _enable_late_materialization = tnode.hash_join_node.late_materialization; } + if (tnode.hash_join_node.__isset.enable_partition_hash_join) { + _enable_partition_hash_join = tnode.hash_join_node.enable_partition_hash_join; + } return Status::OK(); } @@ -203,6 +206,7 @@ void HashJoinNode::_init_hash_table_param(HashTableParam* param) { param->build_output_slots = _output_slots; param->probe_output_slots = _output_slots; param->enable_late_materialization = _enable_late_materialization; + param->enable_partition_hash_join = _enable_partition_hash_join; std::set predicate_slots; for (ExprContext* expr_context : _conjunct_ctxs) { @@ -472,7 +476,8 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui HashJoinerParam param(pool, _hash_join_node, _is_null_safes, _build_expr_ctxs, _probe_expr_ctxs, _other_join_conjunct_ctxs, _conjunct_ctxs, child(1)->row_desc(), child(0)->row_desc(), child(1)->type(), child(0)->type(), child(1)->conjunct_ctxs().empty(), _build_runtime_filters, - _output_slots, _output_slots, _distribution_mode, false, _enable_late_materialization); + _output_slots, _output_slots, _distribution_mode, false, _enable_late_materialization, + _enable_partition_hash_join); auto hash_joiner_factory = std::make_shared(param); // Create a shared RefCountedRuntimeFilterCollector diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index d83c0b9fed0bec..f7ea19015b24f8 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -115,6 +115,7 @@ class HashJoinNode final : public ExecNode { bool _is_push_down = false; bool _enable_late_materialization = false; + bool _enable_partition_hash_join = false; JoinHashTable _ht; diff --git a/be/src/exec/hash_joiner.cpp b/be/src/exec/hash_joiner.cpp index e7ea8a9f37d133..7101254c86b3c3 100644 --- a/be/src/exec/hash_joiner.cpp +++ b/be/src/exec/hash_joiner.cpp @@ -44,6 +44,7 @@ void HashJoinProbeMetrics::prepare(RuntimeProfile* runtime_profile) { other_join_conjunct_evaluate_timer = ADD_TIMER(runtime_profile, "OtherJoinConjunctEvaluateTime"); where_conjunct_evaluate_timer = ADD_TIMER(runtime_profile, "WhereConjunctEvaluateTime"); probe_counter = ADD_COUNTER(runtime_profile, "probeCount", TUnit::UNIT); + partition_probe_overhead = ADD_TIMER(runtime_profile, "PartitionProbeOverhead"); } void HashJoinBuildMetrics::prepare(RuntimeProfile* runtime_profile) { @@ -55,8 +56,8 @@ void HashJoinBuildMetrics::prepare(RuntimeProfile* runtime_profile) { runtime_filter_num = ADD_COUNTER(runtime_profile, "RuntimeFilterNum", TUnit::UNIT); build_keys_per_bucket = ADD_COUNTER(runtime_profile, "BuildKeysPerBucket%", TUnit::UNIT); hash_table_memory_usage = ADD_COUNTER(runtime_profile, "HashTableMemoryUsage", TUnit::BYTES); - partial_runtime_bloom_filter_bytes = ADD_COUNTER(runtime_profile, "PartialRuntimeBloomFilterBytes", TUnit::BYTES); + partition_nums = ADD_COUNTER(runtime_profile, "PartitionNums", TUnit::UNIT); } HashJoiner::HashJoiner(const HashJoinerParam& param) @@ -87,7 +88,11 @@ HashJoiner::HashJoiner(const HashJoinerParam& param) if (param._hash_join_node.__isset.build_runtime_filters_from_planner) { _build_runtime_filters_from_planner = param._hash_join_node.build_runtime_filters_from_planner; } - _hash_join_builder = _pool->add(new SingleHashJoinBuilder(*this)); + + HashJoinBuildOptions build_options; + build_options.enable_partitioned_hash_join = param._enable_partition_hash_join; + + _hash_join_builder = HashJoinBuilderFactory::create(_pool, build_options, *this); _hash_join_prober = _pool->add(new HashJoinProber(*this)); _build_metrics = _pool->add(new HashJoinBuildMetrics()); _probe_metrics = _pool->add(new HashJoinProbeMetrics()); @@ -247,8 +252,8 @@ Status HashJoiner::push_chunk(RuntimeState* state, ChunkPtr&& chunk) { return _hash_join_prober->push_probe_chunk(state, std::move(chunk)); } -Status HashJoiner::probe_input_finished() { - return Status::OK(); +Status HashJoiner::probe_input_finished(RuntimeState* state) { + return _hash_join_prober->on_input_finished(state); } StatusOr HashJoiner::pull_chunk(RuntimeState* state) { @@ -365,9 +370,7 @@ Status HashJoiner::reset_probe(starrocks::RuntimeState* state) { return Status::OK(); } - _hash_join_prober->reset(); - - _hash_join_builder->reset_probe(state); + _hash_join_prober->reset(state); return Status::OK(); } diff --git a/be/src/exec/hash_joiner.h b/be/src/exec/hash_joiner.h index 5d2fa1cc75124b..e671bd6c4b45fa 100644 --- a/be/src/exec/hash_joiner.h +++ b/be/src/exec/hash_joiner.h @@ -72,7 +72,7 @@ struct HashJoinerParam { bool build_conjunct_ctxs_is_empty, std::list build_runtime_filters, std::set build_output_slots, std::set probe_output_slots, const TJoinDistributionMode::type distribution_mode, bool mor_reader_mode, - bool enable_late_materialization) + bool enable_late_materialization, bool enable_partition_hash_join) : _pool(pool), _hash_join_node(hash_join_node), _is_null_safes(std::move(is_null_safes)), @@ -90,7 +90,8 @@ struct HashJoinerParam { _probe_output_slots(std::move(probe_output_slots)), _distribution_mode(distribution_mode), _mor_reader_mode(mor_reader_mode), - _enable_late_materialization(enable_late_materialization) {} + _enable_late_materialization(enable_late_materialization), + _enable_partition_hash_join(enable_partition_hash_join) {} HashJoinerParam(HashJoinerParam&&) = default; HashJoinerParam(HashJoinerParam&) = default; @@ -115,6 +116,7 @@ struct HashJoinerParam { const TJoinDistributionMode::type _distribution_mode; const bool _mor_reader_mode; const bool _enable_late_materialization; + const bool _enable_partition_hash_join; }; inline bool could_short_circuit(TJoinOp::type join_type) { @@ -143,6 +145,7 @@ struct HashJoinProbeMetrics { RuntimeProfile::Counter* where_conjunct_evaluate_timer = nullptr; RuntimeProfile::Counter* output_build_column_timer = nullptr; RuntimeProfile::Counter* probe_counter = nullptr; + RuntimeProfile::Counter* partition_probe_overhead = nullptr; void prepare(RuntimeProfile* runtime_profile); }; @@ -156,8 +159,8 @@ struct HashJoinBuildMetrics { RuntimeProfile::Counter* runtime_filter_num = nullptr; RuntimeProfile::Counter* build_keys_per_bucket = nullptr; RuntimeProfile::Counter* hash_table_memory_usage = nullptr; - RuntimeProfile::Counter* partial_runtime_bloom_filter_bytes = nullptr; + RuntimeProfile::Counter* partition_nums = nullptr; void prepare(RuntimeProfile* runtime_profile); }; @@ -210,7 +213,7 @@ class HashJoiner final : public pipeline::ContextWithDependency { Status build_ht(RuntimeState* state); // probe phase Status push_chunk(RuntimeState* state, ChunkPtr&& chunk); - Status probe_input_finished(); + Status probe_input_finished(RuntimeState* state); StatusOr pull_chunk(RuntimeState* state); pipeline::RuntimeInFilters& get_runtime_in_filters() { return _runtime_in_filters; } @@ -274,10 +277,16 @@ class HashJoiner final : public pipeline::ContextWithDependency { return Status::OK(); } - const std::vector probe_expr_ctxs() { return _probe_expr_ctxs; } + const std::vector& probe_expr_ctxs() { return _probe_expr_ctxs; } + const std::vector& build_expr_ctxs() { return _build_expr_ctxs; } HashJoinProber* new_prober(ObjectPool* pool) { return _hash_join_prober->clone_empty(pool); } - HashJoinBuilder* new_builder(ObjectPool* pool) { return _hash_join_builder->clone_empty(pool); } + HashJoinBuilder* new_builder(ObjectPool* pool) { + // We don't support spill partition hash join now. + HashJoinBuildOptions options; + options.enable_partitioned_hash_join = false; + return HashJoinBuilderFactory::create(pool, options, *this); + } Status filter_probe_output_chunk(ChunkPtr& chunk, JoinHashTable& hash_table) { // Probe in JoinHashMap is divided into probe with other_conjuncts and without other_conjuncts. diff --git a/be/src/exec/join_hash_map.cpp b/be/src/exec/join_hash_map.cpp index afbe8107586b31..9389cb90e6c66f 100644 --- a/be/src/exec/join_hash_map.cpp +++ b/be/src/exec/join_hash_map.cpp @@ -656,6 +656,21 @@ void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_colum _table_items->row_count += chunk->num_rows(); } +void JoinHashTable::merge_ht(const JoinHashTable& ht) { + _table_items->row_count += ht._table_items->row_count; + + Columns& columns = _table_items->build_chunk->columns(); + Columns& other_columns = ht._table_items->build_chunk->columns(); + + for (size_t i = 0; i < _table_items->build_column_count; i++) { + if (!columns[i]->is_nullable() && other_columns[i]->is_nullable()) { + // upgrade to nullable column + columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0)); + } + columns[i]->append(*other_columns[i], 1, other_columns[i]->size() - 1); + } +} + ChunkPtr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const { DCHECK(chunk != nullptr && chunk->num_rows() > 0); ChunkPtr output = std::make_shared(); diff --git a/be/src/exec/join_hash_map.h b/be/src/exec/join_hash_map.h index 9f4e5eb05b9396..781a466cd63d62 100644 --- a/be/src/exec/join_hash_map.h +++ b/be/src/exec/join_hash_map.h @@ -280,6 +280,7 @@ struct HashTableProbeState { struct HashTableParam { bool with_other_conjunct = false; bool enable_late_materialization = false; + bool enable_partition_hash_join = false; TJoinOp::type join_type = TJoinOp::INNER_JOIN; const RowDescriptor* build_row_desc = nullptr; const RowDescriptor* probe_row_desc = nullptr; @@ -833,6 +834,7 @@ class JoinHashTable { Status lazy_output(RuntimeState* state, ChunkPtr* probe_chunk, ChunkPtr* result_chunk); void append_chunk(const ChunkPtr& chunk, const Columns& key_columns); + void merge_ht(const JoinHashTable& ht); // convert input column to spill schema order ChunkPtr convert_to_spill_schema(const ChunkPtr& chunk) const; diff --git a/be/src/exec/mor_processor.cpp b/be/src/exec/mor_processor.cpp index dd6a881f58b6d4..63a05bbc1a22a0 100644 --- a/be/src/exec/mor_processor.cpp +++ b/be/src/exec/mor_processor.cpp @@ -48,7 +48,7 @@ Status IcebergMORProcessor::init(RuntimeState* runtime_state, const MORParams& p std::vector(), std::vector(), *_build_row_desc, *_probe_row_desc, TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HDFS_SCAN_NODE, true, std::list(), std::set(), probe_output_slot_ids, - TJoinDistributionMode::PARTITIONED, true, false)); + TJoinDistributionMode::PARTITIONED, true, false, false)); _hash_joiner = _pool.add(new HashJoiner(*param)); RETURN_IF_ERROR(_hash_joiner->prepare_builder(runtime_state, _runtime_profile)); diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 5e6a0945dec2ea..500cba44b14b1d 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -108,14 +108,6 @@ Status ParquetScanner::append_batch_to_src_chunk(ChunkPtr* chunk) { _conv_ctx.current_slot = slot_desc; auto* array = _batch->column(column_pos++).get(); auto& column = (*chunk)->get_column_by_slot_id(slot_desc->id()); - // for timestamp type, _state->timezone which is specified by user. convert function - // obtains timezone from array. thus timezone in array should be rectified to - // _state->timezone. - if (array->type_id() == ArrowTypeId::TIMESTAMP) { - auto* timestamp_type = down_cast(array->type().get()); - auto& mutable_timezone = (std::string&)timestamp_type->timezone(); - mutable_timezone = _state->timezone(); - } RETURN_IF_ERROR(convert_array_to_column(_conv_funcs[i].get(), num_elements, array, column, _batch_start_idx, _chunk_start_idx, &_chunk_filter, &_conv_ctx)); } @@ -321,6 +313,15 @@ Status ParquetScanner::convert_array_to_column(ConvertFuncTree* conv_func, size_ const arrow::Array* array, const ColumnPtr& column, size_t batch_start_idx, size_t chunk_start_idx, Filter* chunk_filter, ArrowConvertContext* conv_ctx) { + // for timestamp type, state->timezone which is specified by user. convert function + // obtains timezone from array. thus timezone in array should be rectified to + // state->timezone. + if (array->type_id() == ArrowTypeId::TIMESTAMP) { + auto* timestamp_type = down_cast(array->type().get()); + auto& mutable_timezone = (std::string&)timestamp_type->timezone(); + mutable_timezone = conv_ctx->state->timezone(); + } + uint8_t* null_data; Column* data_column; if (column->is_nullable()) { diff --git a/be/src/exec/parquet_schema_builder.cpp b/be/src/exec/parquet_schema_builder.cpp index 8a3ac6bc1f9ead..f582520ed9da59 100644 --- a/be/src/exec/parquet_schema_builder.cpp +++ b/be/src/exec/parquet_schema_builder.cpp @@ -23,6 +23,7 @@ static Status get_parquet_type_from_group(const ::parquet::schema::NodePtr& node static Status get_parquet_type_from_primitive(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc); static Status get_parquet_type_from_list(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc); static Status get_parquet_type_from_map(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc); +static Status try_to_infer_struct_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc); Status get_parquet_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc) { if (node->is_group()) { @@ -121,6 +122,11 @@ static Status get_parquet_type_from_group(const ::parquet::schema::NodePtr& node return get_parquet_type_from_map(node, type_desc); } + auto st = try_to_infer_struct_type(node, type_desc); + if (st.ok()) { + return Status::OK(); + } + // Treat unsupported types as VARCHAR. *type_desc = TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH); return Status::OK(); @@ -217,4 +223,44 @@ static Status get_parquet_type_from_map(const ::parquet::schema::NodePtr& node, return Status::OK(); } +/* +try to infer struct type from group node. + +parquet does not have struct type, there is no struct definition in parquet. +try to infer like this. +group { + type field0; + type field1; + ... +} +*/ +static Status try_to_infer_struct_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc) { + // 1st level. + // group name + DCHECK(node->is_group()); + + auto group_node = std::static_pointer_cast<::parquet::schema::GroupNode>(node); + int field_count = group_node->field_count(); + if (field_count == 0) { + return Status::Unknown("unknown type"); + } + + // 2nd level. + // field + std::vector field_names; + std::vector field_types; + field_names.reserve(field_count); + field_types.reserve(field_count); + for (auto i = 0; i < group_node->field_count(); ++i) { + const auto& field = group_node->field(i); + field_names.emplace_back(field->name()); + auto& field_type_desc = field_types.emplace_back(); + RETURN_IF_ERROR(get_parquet_type(field, &field_type_desc)); + } + + *type_desc = TypeDescriptor::create_struct_type(field_names, field_types); + + return Status::OK(); +} + } //namespace starrocks \ No newline at end of file diff --git a/be/src/exec/pipeline/hashjoin/hash_join_probe_operator.cpp b/be/src/exec/pipeline/hashjoin/hash_join_probe_operator.cpp index 61158cf0e69ad9..7b4225054fef03 100644 --- a/be/src/exec/pipeline/hashjoin/hash_join_probe_operator.cpp +++ b/be/src/exec/pipeline/hashjoin/hash_join_probe_operator.cpp @@ -87,7 +87,7 @@ StatusOr HashJoinProbeOperator::pull_chunk(RuntimeState* state) { } Status HashJoinProbeOperator::set_finishing(RuntimeState* state) { - RETURN_IF_ERROR(_join_prober->probe_input_finished()); + RETURN_IF_ERROR(_join_prober->probe_input_finished(state)); _join_prober->enter_post_probe_phase(); return Status::OK(); } diff --git a/be/src/exec/pipeline/pipeline_driver.cpp b/be/src/exec/pipeline/pipeline_driver.cpp index df6ff53a5fd88a..114769e0c5f3d6 100644 --- a/be/src/exec/pipeline/pipeline_driver.cpp +++ b/be/src/exec/pipeline/pipeline_driver.cpp @@ -555,7 +555,9 @@ void PipelineDriver::finish_operators(RuntimeState* runtime_state) { void PipelineDriver::cancel_operators(RuntimeState* runtime_state) { if (this->query_ctx()->is_query_expired()) { - LOG(WARNING) << "begin to cancel operators for " << to_readable_string(); + if (_has_log_cancelled.exchange(true) == false) { + VLOG_ROW << "begin to cancel operators for " << to_readable_string(); + } } for (auto& op : _operators) { WARN_IF_ERROR(_mark_operator_cancelled(op, runtime_state), @@ -805,10 +807,11 @@ Status PipelineDriver::_mark_operator_finished(OperatorPtr& op, RuntimeState* st Status PipelineDriver::_mark_operator_cancelled(OperatorPtr& op, RuntimeState* state) { Status res = _mark_operator_finished(op, state); - if (!res.ok()) { - LOG(WARNING) << fmt::format("fragment_id {} driver {} cancels operator {} with finished error {}", - print_id(state->fragment_instance_id()), to_readable_string(), op->get_name(), - res.message()); + if (!res.ok() && !res.is_cancelled()) { + LOG(WARNING) << fmt::format( + "[Driver] failed to finish operator called by cancelling operator [fragment_id={}] [driver={}] " + "[operator={}] [error={}]", + print_id(state->fragment_instance_id()), to_readable_string(), op->get_name(), res.message()); } auto& op_state = _operator_stages[op->get_id()]; if (op_state >= OperatorStage::CANCELLED) { diff --git a/be/src/exec/pipeline/pipeline_driver.h b/be/src/exec/pipeline/pipeline_driver.h index 4ffa5a56985cde..5f9d1c8b0dcd15 100644 --- a/be/src/exec/pipeline/pipeline_driver.h +++ b/be/src/exec/pipeline/pipeline_driver.h @@ -526,6 +526,8 @@ class PipelineDriver { size_t _driver_queue_level = 0; std::atomic _in_ready_queue{false}; + std::atomic _has_log_cancelled{false}; + // metrics RuntimeProfile::Counter* _total_timer = nullptr; RuntimeProfile::Counter* _active_timer = nullptr; diff --git a/be/src/exec/pipeline/query_context.cpp b/be/src/exec/pipeline/query_context.cpp index d70c18d40b5c8d..fc8aa6598cca3a 100644 --- a/be/src/exec/pipeline/query_context.cpp +++ b/be/src/exec/pipeline/query_context.cpp @@ -375,8 +375,8 @@ QueryContext* QueryContextManager::get_or_register(const TUniqueId& query_id) { // lookup query context for the second chance in sc_map if (sc_it != sc_map.end()) { auto ctx = std::move(sc_it->second); - RETURN_NULL_IF_CTX_CANCELLED(ctx); sc_map.erase(sc_it); + RETURN_NULL_IF_CTX_CANCELLED(ctx); auto* raw_ctx_ptr = ctx.get(); context_map.emplace(query_id, std::move(ctx)); return raw_ctx_ptr; diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index 0a9a0bcbd2b469..dbe96de9139f3c 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -372,8 +372,9 @@ pipeline::OpFactories TopNNode::decompose_to_pipeline(pipeline::PipelineBuilderC bool is_partition_skewed = _tnode.sort_node.__isset.analytic_partition_skewed && _tnode.sort_node.analytic_partition_skewed; bool need_merge = _analytic_partition_exprs.empty() || is_partition_skewed; - bool enable_parallel_merge = - _tnode.sort_node.__isset.enable_parallel_merge && _tnode.sort_node.enable_parallel_merge; + bool enable_parallel_merge = _tnode.sort_node.__isset.enable_parallel_merge && + _tnode.sort_node.enable_parallel_merge && + !_sort_exec_exprs.lhs_ordering_expr_ctxs().empty(); OpFactories operators_source_with_sort; diff --git a/be/src/exprs/agg/array_agg.h b/be/src/exprs/agg/array_agg.h index e4520550a1e299..ac15f44573d2ba 100644 --- a/be/src/exprs/agg/array_agg.h +++ b/be/src/exprs/agg/array_agg.h @@ -94,19 +94,12 @@ struct ArrayAggAggregateState { return &data_column; } - bool check_overflow(FunctionContext* ctx) const { - std::string err_msg; - if (UNLIKELY(data_column.capacity_limit_reached(&err_msg))) { - ctx->set_error(fmt::format("The column generated by array_agg is overflow: {}", err_msg).c_str()); - return true; - } - return false; - } + bool check_overflow(FunctionContext* ctx) const { return check_overflow(data_column, ctx); } static bool check_overflow(const Column& col, FunctionContext* ctx) { - std::string err_msg; - if (UNLIKELY(col.capacity_limit_reached(&err_msg))) { - ctx->set_error(fmt::format("The column generated by array_agg is overflow: {}", err_msg).c_str()); + Status st = col.capacity_limit_reached(); + if (!st.ok()) { + ctx->set_error(fmt::format("The column generated by array_agg is overflow: {}", st.message()).c_str()); return true; } return false; @@ -195,8 +188,9 @@ struct ArrayAggAggregateStateV2 { bool check_overflow(FunctionContext* ctx) const { std::string err_msg; for (size_t i = 0; i < data_columns.size(); i++) { - if (UNLIKELY(data_columns[i]->capacity_limit_reached(&err_msg))) { - ctx->set_error(fmt::format("The column generated by array_agg is overflow: {}", err_msg).c_str()); + Status st = data_columns[i]->capacity_limit_reached(); + if (!st.ok()) { + ctx->set_error(fmt::format("The column generated by array_agg is overflow: {}", st.message()).c_str()); return true; } } @@ -204,9 +198,9 @@ struct ArrayAggAggregateStateV2 { } static bool check_overflow(const Column& col, FunctionContext* ctx) { - std::string err_msg; - if (UNLIKELY(col.capacity_limit_reached(&err_msg))) { - ctx->set_error(fmt::format("The column generated by array_agg is overflow: {}", err_msg).c_str()); + Status st = col.capacity_limit_reached(); + if (!st.ok()) { + ctx->set_error(fmt::format("The column generated by array_agg is overflow: {}", st.message()).c_str()); return true; } return false; diff --git a/be/src/exprs/bitmap_functions.cpp b/be/src/exprs/bitmap_functions.cpp index e4a2b97c1ffd9f..ff34607b279030 100644 --- a/be/src/exprs/bitmap_functions.cpp +++ b/be/src/exprs/bitmap_functions.cpp @@ -700,13 +700,8 @@ StatusOr BitmapFunctions::bitmap_to_binary(FunctionContext* context, } ColumnPtr col = builder.build(ColumnHelper::is_all_const(columns)); - std::string err_msg; - if (col->capacity_limit_reached(&err_msg)) { - return Status::InternalError( - strings::Substitute("Size of binary column generated by bitmap_to_binary reaches limit: $0", err_msg)); - } else { - return col; - } + RETURN_IF_ERROR(col->capacity_limit_reached()); + return col; } StatusOr BitmapFunctions::bitmap_from_binary(FunctionContext* context, const Columns& columns) { diff --git a/be/src/exprs/debug_expr.cpp b/be/src/exprs/debug_expr.cpp index b1d0098906f800..5d4b260c96c8ec 100644 --- a/be/src/exprs/debug_expr.cpp +++ b/be/src/exprs/debug_expr.cpp @@ -45,12 +45,7 @@ StatusOr DebugFunctions::chunk_check_valid(ExprContext* context, Chun size_t num_rows = ptr->num_rows(); for (const auto& column : ptr->columns()) { // check column size capacity - std::string msg; - column->capacity_limit_reached(&msg); - if (!msg.empty()) { - DCHECK(false) << "not expected"; - throw std::runtime_error(msg); - } + RETURN_IF_ERROR(column->capacity_limit_reached()); // check column size matched if (column->size() != num_rows) { DCHECK(false) << "not expected"; diff --git a/be/src/exprs/function_call_expr.cpp b/be/src/exprs/function_call_expr.cpp index 988c05d4896cc4..b27fe037bb6a61 100644 --- a/be/src/exprs/function_call_expr.cpp +++ b/be/src/exprs/function_call_expr.cpp @@ -192,11 +192,7 @@ StatusOr VectorizedFunctionCallExpr::evaluate_checked(starrocks::Expr } RETURN_IF_ERROR(result); if (_fn_desc->check_overflow) { - std::string err_msg; - if (UNLIKELY(result.value()->capacity_limit_reached(&err_msg))) { - return Status::InternalError( - fmt::format("Result column of function {} exceed limit: {}", _fn_desc->name, err_msg)); - } + RETURN_IF_ERROR(result.value()->capacity_limit_reached()); } // For no args function call (pi, e) diff --git a/be/src/exprs/like_predicate.cpp b/be/src/exprs/like_predicate.cpp index 84aecb671e7c37..813d3bfbf96460 100644 --- a/be/src/exprs/like_predicate.cpp +++ b/be/src/exprs/like_predicate.cpp @@ -447,6 +447,48 @@ StatusOr LikePredicate::_predicate_const_regex(FunctionContext* conte return result->build(value_column->is_constant()); } +enum class FastPathType { + EQUALS = 0, + START_WITH = 1, + END_WITH = 2, + SUBSTRING = 3, + REGEX = 4, +}; + +FastPathType extract_fast_path(const Slice& pattern) { + if (pattern.empty() || pattern.size < 2) { + return FastPathType::REGEX; + } + + if (pattern.data[0] == '_' || pattern.data[pattern.size - 1] == '_') { + return FastPathType::REGEX; + } + + bool is_end_with = pattern.data[0] == '%'; + bool is_start_with = pattern.data[pattern.size - 1] == '%'; + + for (size_t i = 1; i < pattern.size - 1;) { + if (pattern.data[i] == '\\') { + i += 2; + } else { + if (pattern.data[i] == '%' || pattern.data[i] == '_') { + return FastPathType::REGEX; + } + i++; + } + } + + if (is_end_with && is_start_with) { + return FastPathType::SUBSTRING; + } else if (is_end_with) { + return FastPathType::END_WITH; + } else if (is_start_with) { + return FastPathType::START_WITH; + } else { + return FastPathType::EQUALS; + } +} + StatusOr LikePredicate::regex_match_full(FunctionContext* context, const starrocks::Columns& columns) { const auto& value_column = VECTORIZED_FN_ARGS(0); const auto& pattern_column = VECTORIZED_FN_ARGS(1); @@ -478,18 +520,56 @@ StatusOr LikePredicate::regex_match_full(FunctionContext* context, co continue; } - auto re_pattern = LikePredicate::template convert_like_pattern(context, pattern_viewer.value(row)); - - re2::RE2 re(re_pattern, opts); - - if (!re.ok()) { - context->set_error(strings::Substitute("Invalid regex: $0", re_pattern).c_str()); - result.append_null(); - continue; + Slice pattern = pattern_viewer.value(row); + FastPathType val = extract_fast_path(pattern); + switch (val) { + case FastPathType::EQUALS: { + std::string str_pattern = pattern.to_string(); + remove_escape_character(&str_pattern); + result.append(value_viewer.value(row) == str_pattern); + break; } + case FastPathType::START_WITH: { + std::string str_pattern = pattern.to_string(); + remove_escape_character(&str_pattern); + auto pattern_slice = Slice(str_pattern); + pattern_slice.remove_suffix(1); + result.append(ConstantStartsImpl::apply(value_viewer.value(row), pattern_slice)); + break; + } + case FastPathType::END_WITH: { + std::string str_pattern = pattern.to_string(); + remove_escape_character(&str_pattern); + auto pattern_slice = Slice(str_pattern); + pattern_slice.remove_prefix(1); + result.append(ConstantEndsImpl::apply(value_viewer.value(row), pattern_slice)); + break; + } + case FastPathType::SUBSTRING: { + std::string str_pattern = pattern.to_string(); + remove_escape_character(&str_pattern); + auto pattern_slice = Slice(str_pattern); + pattern_slice.remove_prefix(1); + pattern_slice.remove_suffix(1); + auto searcher = LibcASCIICaseSensitiveStringSearcher(pattern_slice.get_data(), pattern_slice.get_size()); + /// searcher returns a pointer to the found substring or to the end of `haystack`. + const Slice& value = value_viewer.value(row); + const char* res_pointer = searcher.search(value.data, value.size); + result.append(!!res_pointer); + break; + } + case FastPathType::REGEX: { + auto re_pattern = LikePredicate::template convert_like_pattern(context, pattern); - auto v = RE2::FullMatch(re2::StringPiece(value_viewer.value(row).data, value_viewer.value(row).size), re); - result.append(v); + re2::RE2 re(re_pattern, opts); + if (!re.ok()) { + return Status::InvalidArgument(strings::Substitute("Invalid regex: $0", re_pattern)); + } + auto v = RE2::FullMatch(re2::StringPiece(value_viewer.value(row).data, value_viewer.value(row).size), re); + result.append(v); + break; + } + } } return result.build(all_const); diff --git a/be/src/exprs/string_functions.cpp b/be/src/exprs/string_functions.cpp index 799953d67628ce..63dc49ac6ec657 100644 --- a/be/src/exprs/string_functions.cpp +++ b/be/src/exprs/string_functions.cpp @@ -56,7 +56,7 @@ namespace starrocks { static const RE2 SUBSTRING_RE(R"((?:\.\*)*([^\.\^\{\[\(\|\)\]\}\+\*\?\$\\]+)(?:\.\*)*)", re2::RE2::Quiet); #define THROW_RUNTIME_ERROR_IF_EXCEED_LIMIT(col, func_name) \ - if (UNLIKELY(col->capacity_limit_reached())) { \ + if (UNLIKELY(!col->capacity_limit_reached().ok())) { \ col->reset_column(); \ throw std::runtime_error("binary column exceed 4G in function " #func_name); \ } diff --git a/be/src/exprs/table_function/subdivide_bitmap.h b/be/src/exprs/table_function/subdivide_bitmap.h index b55f63422c40c7..7617f6421c0b76 100644 --- a/be/src/exprs/table_function/subdivide_bitmap.h +++ b/be/src/exprs/table_function/subdivide_bitmap.h @@ -105,17 +105,16 @@ class SubdivideBitmap final : public TableFunction { &compact_offset); } } - std::string err_msg; - auto ret = dst_bitmap_col->capacity_limit_reached(&err_msg); - if (ret) { + Status st = dst_bitmap_col->capacity_limit_reached(); + if (!st.ok()) { state->set_status(Status::InternalError( - fmt::format("Bitmap column generate by subdivide_bitmap reach limit, {}", err_msg))); + fmt::format("Bitmap column generate by subdivide_bitmap reach limit, {}", st.message()))); return {}; } - ret = dst_offset_col->capacity_limit_reached(&err_msg); - if (ret) { + st = dst_offset_col->capacity_limit_reached(); + if (!st.ok()) { state->set_status(Status::InternalError( - fmt::format("Offset column generate by subdivide_bitmap reach limit, {}", err_msg))); + fmt::format("Offset column generate by subdivide_bitmap reach limit, {}", st.message()))); return {}; } dst_columns.emplace_back(std::move(dst_bitmap_col)); diff --git a/be/src/formats/disk_range.hpp b/be/src/formats/disk_range.hpp index 90641ee320566b..ced8c97c8bdbc2 100644 --- a/be/src/formats/disk_range.hpp +++ b/be/src/formats/disk_range.hpp @@ -24,7 +24,7 @@ class DiskRange { public: DiskRange(const int64_t off, const int64_t len) : _offset(off), _length(len) { DCHECK(off >= 0); - DCHECK(len > 0); + DCHECK(len >= 0); } /** diff --git a/be/src/runtime/current_thread.h b/be/src/runtime/current_thread.h index 9f56f4da9e8309..5c4512484f11c4 100644 --- a/be/src/runtime/current_thread.h +++ b/be/src/runtime/current_thread.h @@ -93,6 +93,7 @@ class CurrentThread { _reserved_bytes = prev_reserved; _cache_size -= size; _allocated_cache_size -= size; + _total_consumed_bytes -= size; _try_consume_mem_size = size; }; if (_cache_size >= BATCH_SIZE) { @@ -132,6 +133,7 @@ class CurrentThread { } else { _cache_size -= size; _allocated_cache_size -= size; + _total_consumed_bytes -= size; _try_consume_mem_size = size; tls_exceed_mem_tracker = limit_tracker; return false; @@ -162,6 +164,7 @@ class CurrentThread { void release(int64_t size) { _cache_size -= size; _deallocated_cache_size += size; + _total_consumed_bytes -= size; if (_cache_size <= -BATCH_SIZE) { commit(false); } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 68070aa0ad1193..132f191b180adb 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -301,7 +301,11 @@ StatusOr get_txn_status(const AuthInfo& auth, std::str bool wait_txn_visible_until(const AuthInfo& auth, std::string_view db, std::string_view table, int64_t txn_id, int64_t deadline) { while (deadline > UnixSeconds()) { - sleep(std::min((int64_t)config::get_txn_status_internal_sec, deadline - UnixSeconds())); + auto wait_seconds = std::min((int64_t)config::get_txn_status_internal_sec, deadline - UnixSeconds()); + LOG(WARNING) << "transaction is not visible now, will wait " << wait_seconds + << " seconds before retrieving the status again, txn_id: " << txn_id; + // The following sleep might introduce delay to the commit and publish total time + sleep(wait_seconds); auto status_or = get_txn_status(auth, db, table, txn_id); if (!status_or.ok()) { return false; diff --git a/be/src/storage/kv_store.cpp b/be/src/storage/kv_store.cpp index 1f8001efd8b948..0fe04ccdbe1f33 100644 --- a/be/src/storage/kv_store.cpp +++ b/be/src/storage/kv_store.cpp @@ -341,17 +341,9 @@ std::string KVStore::get_root_path() { Status KVStore::OptDeleteRange(ColumnFamilyIndex column_family_index, const std::string& begin_key, const std::string& end_key, WriteBatch* batch) { rocksdb::ColumnFamilyHandle* handle = _handles[column_family_index]; - int key_cnt = 0; return iterate_range(column_family_index, begin_key, end_key, [&](std::string_view key, std::string_view value) -> StatusOr { - if (key_cnt >= config::rocksdb_opt_delete_range_limit) { - // fallback and use `DeleteRange` instead. - batch->Clear(); - RETURN_ERROR_IF_FALSE(batch->DeleteRange(handle, begin_key, end_key).ok()); - return false; - } - batch->Delete(handle, key); - key_cnt++; + RETURN_ERROR_IF_FALSE(batch->Delete(handle, key).ok()); return true; }); } diff --git a/be/src/storage/memtable.cpp b/be/src/storage/memtable.cpp index 77d8f5008c9642..434b5d86318e0f 100644 --- a/be/src/storage/memtable.cpp +++ b/be/src/storage/memtable.cpp @@ -324,10 +324,9 @@ Status MemTable::flush(SegmentPB* seg_info) { if (UNLIKELY(_result_chunk == nullptr)) { return Status::OK(); } - std::string msg; - if (_result_chunk->capacity_limit_reached(&msg)) { - return Status::InternalError( - fmt::format("memtable of tablet {} reache the capacity limit, detail msg: {}", _tablet_id, msg)); + if (auto st = _result_chunk->capacity_limit_reached(); !st.ok()) { + return Status::InternalError(fmt::format("memtable of tablet {} reache the capacity limit, detail msg: {}", + _tablet_id, st.message())); } auto scope = IOProfiler::scope(IOProfiler::TAG_LOAD, _tablet_id); int64_t duration_ns = 0; diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index 5e4e163a1ff4c2..b9186a7921247f 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -939,8 +939,8 @@ void* StorageEngine::_schedule_apply_thread_callback(void* arg) { auto time_point = std::chrono::steady_clock::now(); while (!_bg_worker_stopped.load(std::memory_order_consume) && !_schedule_apply_tasks.empty() && _schedule_apply_tasks.top().first <= time_point) { - _schedule_apply_tasks.pop(); auto tablet_id = _schedule_apply_tasks.top().second; + _schedule_apply_tasks.pop(); auto tablet = _tablet_manager->get_tablet(tablet_id); if (tablet == nullptr || tablet->updates() == nullptr) { continue; diff --git a/be/src/storage/rowset/column_iterator.h b/be/src/storage/rowset/column_iterator.h index a168688e47fba8..e28c5be6ea94f1 100644 --- a/be/src/storage/rowset/column_iterator.h +++ b/be/src/storage/rowset/column_iterator.h @@ -61,6 +61,8 @@ struct ColumnIteratorOptions { // reader statistics OlapReaderStatistics* stats = nullptr; bool use_page_cache = false; + // temporary data does not allow caching + bool temporary_data = false; LakeIOOptions lake_io_opts{.fill_data_cache = true}; // check whether column pages are all dictionary encoding. diff --git a/be/src/storage/rowset/scalar_column_iterator.cpp b/be/src/storage/rowset/scalar_column_iterator.cpp index 1b2b46e02b3843..481e4ca1f20a4e 100644 --- a/be/src/storage/rowset/scalar_column_iterator.cpp +++ b/be/src/storage/rowset/scalar_column_iterator.cpp @@ -52,8 +52,9 @@ Status ScalarColumnIterator::init(const ColumnIteratorOptions& opts) { _opts = opts; IndexReadOptions index_opts; - index_opts.use_page_cache = config::enable_ordinal_index_memory_page_cache || !config::disable_storage_page_cache; - index_opts.kept_in_memory = config::enable_ordinal_index_memory_page_cache; + index_opts.use_page_cache = !opts.temporary_data && + (config::enable_ordinal_index_memory_page_cache || !config::disable_storage_page_cache); + index_opts.kept_in_memory = !opts.temporary_data && config::enable_ordinal_index_memory_page_cache; index_opts.lake_io_opts = opts.lake_io_opts; index_opts.read_file = _opts.read_file; index_opts.stats = _opts.stats; @@ -381,8 +382,9 @@ Status ScalarColumnIterator::get_row_ranges_by_zone_map(const std::vectorid()] = field->uid(); } - RETURN_IF_ERROR(_bitmap_index_evaluator.init([&cid_2_ucid, - this](ColumnId cid) -> StatusOr { - const ColumnUID ucid = cid_2_ucid[cid]; - // the column's index in this segment file - ASSIGN_OR_RETURN(std::shared_ptr segment_ptr, _get_dcg_segment(ucid)); - if (segment_ptr == nullptr) { - // find segment from delta column group failed, using main segment - segment_ptr = _segment; - } + RETURN_IF_ERROR( + _bitmap_index_evaluator.init([&cid_2_ucid, this](ColumnId cid) -> StatusOr { + const ColumnUID ucid = cid_2_ucid[cid]; + // the column's index in this segment file + ASSIGN_OR_RETURN(std::shared_ptr segment_ptr, _get_dcg_segment(ucid)); + if (segment_ptr == nullptr) { + // find segment from delta column group failed, using main segment + segment_ptr = _segment; + } - IndexReadOptions opts; - opts.use_page_cache = config::enable_bitmap_index_memory_page_cache || !config::disable_storage_page_cache; - opts.kept_in_memory = config::enable_bitmap_index_memory_page_cache; - opts.lake_io_opts = _opts.lake_io_opts; - opts.read_file = _column_files[cid].get(); - opts.stats = _opts.stats; - - BitmapIndexIterator* bitmap_iter = nullptr; - RETURN_IF_ERROR(segment_ptr->new_bitmap_index_iterator(ucid, opts, &bitmap_iter)); - return bitmap_iter; - })); + IndexReadOptions opts; + opts.use_page_cache = !_opts.temporary_data && (config::enable_bitmap_index_memory_page_cache || + !config::disable_storage_page_cache); + opts.kept_in_memory = !_opts.temporary_data && config::enable_bitmap_index_memory_page_cache; + opts.lake_io_opts = _opts.lake_io_opts; + opts.read_file = _column_files[cid].get(); + opts.stats = _opts.stats; + + BitmapIndexIterator* bitmap_iter = nullptr; + RETURN_IF_ERROR(segment_ptr->new_bitmap_index_iterator(ucid, opts, &bitmap_iter)); + return bitmap_iter; + })); RETURN_IF(!_bitmap_index_evaluator.has_bitmap_index(), Status::OK()); } diff --git a/be/src/storage/rowset/segment_options.cpp b/be/src/storage/rowset/segment_options.cpp index 84c98fdf41804b..fb28c7effacd78 100644 --- a/be/src/storage/rowset/segment_options.cpp +++ b/be/src/storage/rowset/segment_options.cpp @@ -63,6 +63,7 @@ Status SegmentReadOptions::convert_to(SegmentReadOptions* dst, const std::vector dst->fs = fs; dst->stats = stats; dst->use_page_cache = use_page_cache; + dst->temporary_data = temporary_data; dst->profile = profile; dst->global_dictmaps = global_dictmaps; dst->rowid_range_option = rowid_range_option; diff --git a/be/src/storage/rowset/segment_options.h b/be/src/storage/rowset/segment_options.h index 4e954b7bf3361c..2d8de90c80fab0 100644 --- a/be/src/storage/rowset/segment_options.h +++ b/be/src/storage/rowset/segment_options.h @@ -73,6 +73,8 @@ class SegmentReadOptions { RuntimeProfile* profile = nullptr; bool use_page_cache = false; + // temporary data does not allow caching + bool temporary_data = false; LakeIOOptions lake_io_opts{.fill_data_cache = true}; ReaderType reader_type = READER_QUERY; diff --git a/be/src/storage/rowset/segment_rewriter.cpp b/be/src/storage/rowset/segment_rewriter.cpp index d545124e40c9f8..c352b12edee319 100644 --- a/be/src/storage/rowset/segment_rewriter.cpp +++ b/be/src/storage/rowset/segment_rewriter.cpp @@ -129,6 +129,7 @@ Status SegmentRewriter::rewrite_auto_increment(const std::string& src_path, cons seg_options.fs = fs; seg_options.stats = &stats; seg_options.chunk_size = num_rows; + seg_options.temporary_data = true; auto res = rowset->segments()[segment_id]->new_iterator(src_schema, seg_options); auto& itr = res.value(); @@ -225,6 +226,7 @@ Status SegmentRewriter::rewrite_auto_increment_lake( seg_options.fs = fs; seg_options.stats = &stats; seg_options.chunk_size = num_rows; + seg_options.temporary_data = true; // Read data from the (partial) segment file generated by this import task ASSIGN_OR_RETURN(auto itr, segment->new_iterator(src_schema, seg_options)); diff --git a/be/src/storage/rowset_column_update_state.cpp b/be/src/storage/rowset_column_update_state.cpp index 0ce26499e3da8b..b6be164f1d2b77 100644 --- a/be/src/storage/rowset_column_update_state.cpp +++ b/be/src/storage/rowset_column_update_state.cpp @@ -300,25 +300,23 @@ Status RowsetColumnUpdateState::_finalize_partial_update_state(Tablet* tablet, R return Status::OK(); } -static int64_t calc_upt_memory_usage_per_row_column(Rowset* rowset) { - const auto& txn_meta = rowset->rowset_meta()->get_meta_pb_without_schema().txn_meta(); - const int64_t total_update_col_cnt = txn_meta.partial_update_column_ids_size(); - // `num_rows_upt` and `total_update_row_size` could be zero when upgrade from old version, +int64_t RowsetColumnUpdateState::calc_upt_memory_usage_per_row(Rowset* rowset) { + // `num_rows_upt` could be zero after upgrade from old version, // then we will return zero and no limit. - if ((rowset->num_rows_upt() * total_update_col_cnt) <= 0) return 0; - return rowset->total_update_row_size() / (rowset->num_rows_upt() * total_update_col_cnt); + if ((rowset->num_rows_upt()) <= 0) return 0; + return rowset->total_update_row_size() / rowset->num_rows_upt(); } // Read chunk from source segment file and call `update_func` to update it. // `update_func` accept ChunkUniquePtr and [start_rowid, end_rowid) range of this chunk. -static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& schema, Tablet* tablet, - OlapReaderStatistics* stats, int64_t version, - RowsetSegmentId rowset_seg_id, const std::string& path, - const std::function& update_func) { +static Status read_from_source_segment_and_update( + Rowset* rowset, const Schema& schema, Tablet* tablet, OlapReaderStatistics* stats, int64_t version, + RowsetSegmentId rowset_seg_id, const std::string& path, + const std::function& update_func) { CHECK_MEM_LIMIT("RowsetColumnUpdateState::read_from_source_segment"); ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path())); // We need to estimate each update rows size before it has been actually updated. - const int64_t upt_memory_usage_per_row_column = calc_upt_memory_usage_per_row_column(rowset); + const int64_t upt_memory_usage_per_row = RowsetColumnUpdateState::calc_upt_memory_usage_per_row(rowset); auto segment = Segment::open(fs, FileInfo{path}, rowset_seg_id.segment_id, rowset->schema()); if (!segment.ok()) { LOG(WARNING) << "Fail to open " << path << ": " << segment.status(); @@ -354,13 +352,16 @@ static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& source_chunk_ptr->append(*tmp_chunk_ptr); // Avoid too many memory usage and Column overflow, we will limit source chunk's size. if (source_chunk_ptr->num_rows() >= INT32_MAX || - (int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row_column * (int64_t)schema.num_fields() > + (int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row > config::partial_update_memory_limit_per_worker) { + // Because we will handle columns group by group (define by config::vertical_compaction_max_columns_per_group), + // so use `upt_memory_usage_per_row` to estimate source chunk future memory cost will be overvalued. + // But it's better to be overvalued than undervalued. StreamChunkContainer container = { .chunk_ptr = source_chunk_ptr.get(), .start_rowid = start_rowid, .end_rowid = start_rowid + static_cast(source_chunk_ptr->num_rows())}; - RETURN_IF_ERROR(update_func(container)); + RETURN_IF_ERROR(update_func(container, true /*print log*/, upt_memory_usage_per_row)); start_rowid += static_cast(source_chunk_ptr->num_rows()); source_chunk_ptr->reset(); } @@ -371,7 +372,7 @@ static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& .chunk_ptr = source_chunk_ptr.get(), .start_rowid = start_rowid, .end_rowid = start_rowid + static_cast(source_chunk_ptr->num_rows())}; - RETURN_IF_ERROR(update_func(container)); + RETURN_IF_ERROR(update_func(container, false /*print log*/, upt_memory_usage_per_row)); start_rowid += static_cast(source_chunk_ptr->num_rows()); source_chunk_ptr->reset(); } @@ -759,10 +760,15 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_ rowset->rowset_path(), rowsetid_segid.unique_rowset_id, rowsetid_segid.segment_id); RETURN_IF_ERROR(read_from_source_segment_and_update( rowset, partial_schema, tablet, &stats, latest_applied_version.major_number(), rowsetid_segid, - seg_path, [&](StreamChunkContainer container) { - VLOG(2) << "RowsetColumnUpdateState read from source segment: [byte usage: " - << container.chunk_ptr->bytes_usage() << " row cnt: " << container.chunk_ptr->num_rows() - << "] row range : [" << container.start_rowid << ", " << container.end_rowid << ")"; + seg_path, [&](StreamChunkContainer container, bool print_log, int64_t upt_memory_usage_per_row) { + if (print_log) { + LOG(INFO) << "RowsetColumnUpdateState read from source segment: tablet id:" + << tablet->tablet_id() << " [byte usage: " << container.chunk_ptr->bytes_usage() + << " row cnt: " << container.chunk_ptr->num_rows() << "] row range : [" + << container.start_rowid << ", " << container.end_rowid + << ") upt_memory_usage_per_row : " << upt_memory_usage_per_row + << " update column cnt : " << update_column_ids.size(); + } const size_t source_chunk_size = container.chunk_ptr->memory_usage(); tracker->consume(source_chunk_size); DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); }); diff --git a/be/src/storage/rowset_column_update_state.h b/be/src/storage/rowset_column_update_state.h index 6bba549141019f..66691e8bdd1c9a 100644 --- a/be/src/storage/rowset_column_update_state.h +++ b/be/src/storage/rowset_column_update_state.h @@ -169,6 +169,8 @@ class RowsetColumnUpdateState { // For UT test now const std::vector& upserts() const { return _upserts; } + static int64_t calc_upt_memory_usage_per_row(Rowset* rowset); + private: Status _load_upserts(Rowset* rowset, MemTracker* update_mem_tracker, uint32_t start_idx, uint32_t* end_idx); diff --git a/be/src/storage/schema_change.cpp b/be/src/storage/schema_change.cpp index 264452d999c230..e0091ca6f496e3 100644 --- a/be/src/storage/schema_change.cpp +++ b/be/src/storage/schema_change.cpp @@ -193,7 +193,7 @@ Status HeapChunkMerger::merge(std::vector& chunk_arr, RowsetWriter* ro StorageEngine* storage_engine = StorageEngine::instance(); bool bg_worker_stopped = storage_engine->bg_worker_stopped(); while (!_heap.empty() && !bg_worker_stopped) { - if (tmp_chunk->capacity_limit_reached() || nread >= config::vector_chunk_size) { + if (!tmp_chunk->capacity_limit_reached().ok() || nread >= config::vector_chunk_size) { if (_tablet->keys_type() == KeysType::AGG_KEYS) { aggregate_chunk(*_aggregator, tmp_chunk, rowset_writer); } else { diff --git a/be/src/util/cpu_info.h b/be/src/util/cpu_info.h index 695c26c1576740..9585d69cf3de45 100644 --- a/be/src/util/cpu_info.h +++ b/be/src/util/cpu_info.h @@ -82,6 +82,18 @@ class CpuInfo { static std::string debug_string(); + static const std::vector& get_cache_sizes() { + static std::vector cache_sizes; + static std::vector cache_line_sizes; + + if (cache_sizes.empty()) { + cache_sizes.resize(NUM_CACHE_LEVELS); + cache_line_sizes.resize(NUM_CACHE_LEVELS); + _get_cache_info(cache_sizes.data(), cache_line_sizes.data()); + } + return cache_sizes; + } + private: /// Initialize NUMA-related state - called from Init(); static void _init_numa(); diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp index c5a8ff2d29f479..15b192c123251b 100644 --- a/be/test/exec/parquet_scanner_test.cpp +++ b/be/test/exec/parquet_scanner_test.cpp @@ -701,16 +701,26 @@ TEST_F(ParquetScannerTest, get_file_schema) { {"col_json_map_timestamp", TypeDescriptor::create_map_type(TypeDescriptor::from_logical_type(TYPE_DATETIME), TypeDescriptor::from_logical_type(TYPE_INT))}, - {"col_json_struct", TypeDescriptor::create_varchar_type(1048576)}, + {"col_json_struct", + TypeDescriptor::create_struct_type({"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT), + TypeDescriptor::create_varchar_type(1048576)})}, {"col_json_list_list", TypeDescriptor::create_array_type(TypeDescriptor::create_array_type( TypeDescriptor::from_logical_type(TYPE_INT)))}, {"col_json_map_list", TypeDescriptor::create_map_type( TypeDescriptor::create_varchar_type(1048576), TypeDescriptor::create_array_type(TypeDescriptor::from_logical_type(TYPE_INT)))}, - {"col_json_list_struct", TypeDescriptor::create_array_type(TypeDescriptor::create_varchar_type(1048576))}, - {"col_json_struct_struct", TypeDescriptor::create_varchar_type(1048576)}, - {"col_json_struct_string", TypeDescriptor::create_varchar_type(1048576)}, + {"col_json_list_struct", TypeDescriptor::create_array_type(TypeDescriptor::create_struct_type( + {"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT), + TypeDescriptor::create_varchar_type(1048576)}))}, + {"col_json_struct_struct", + TypeDescriptor::create_struct_type( + {"s0", "s1"}, + {TypeDescriptor::from_logical_type(TYPE_INT), + TypeDescriptor::create_struct_type({"s2"}, {TypeDescriptor::from_logical_type(TYPE_INT)})})}, + {"col_json_struct_string", + TypeDescriptor::create_struct_type({"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT), + TypeDescriptor::create_varchar_type(1048576)})}, {"col_json_json_string", TypeDescriptor::create_varchar_type(1048576)}}}, {test_exec_dir + "/test_data/parquet_data/decimal.parquet", {{"col_decimal32", TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL32, 9, 2)}, diff --git a/be/test/formats/disk_range_test.cpp b/be/test/formats/disk_range_test.cpp index d511cdb2bc54f2..20fcaec53b371c 100644 --- a/be/test/formats/disk_range_test.cpp +++ b/be/test/formats/disk_range_test.cpp @@ -41,6 +41,33 @@ TEST(DiskRangeTest, TestMergeTinyDiskRanges) { EXPECT_EQ(900 * KB, merged_disk_ranges.at(0).length()); } +// unordered DiskRange + DiskRange's length = 0 + duplicate DiskRange + Overlap DiskRange +TEST(DiskRangeTest, TestDiskRangesRobust) { + std::vector disk_ranges{}; + constexpr int64_t KB = 1024; + // unordered + disk_ranges.emplace_back(800 * KB, 100 * KB); + // duplicate + disk_ranges.emplace_back(0, 1 * KB); + disk_ranges.emplace_back(0, 1 * KB); + disk_ranges.emplace_back(0, 1 * KB); + // overlap + disk_ranges.emplace_back(10 * KB, 30 * KB); + disk_ranges.emplace_back(15 * KB, 45 * KB); + disk_ranges.emplace_back(850 * KB, 150 * KB); + // length = 0 + duplicate + disk_ranges.emplace_back(40 * KB, 0 * KB); + disk_ranges.emplace_back(900 * KB, 0 * KB); + disk_ranges.emplace_back(900 * KB, 0 * KB); + disk_ranges.emplace_back(900 * KB, 0 * KB); + std::vector merged_disk_ranges{}; + DiskRangeHelper::merge_adjacent_disk_ranges(disk_ranges, config::io_coalesce_read_max_distance_size, + config::io_coalesce_read_max_buffer_size, merged_disk_ranges); + EXPECT_EQ(1, merged_disk_ranges.size()); + EXPECT_EQ(0, merged_disk_ranges.at(0).offset()); + EXPECT_EQ(1000 * KB, merged_disk_ranges.at(0).length()); +} + TEST(DiskRangeTest, TestMergeBigDiskRanges) { std::vector disk_ranges{}; constexpr int64_t MB = 1024 * 1024; diff --git a/be/test/storage/kv_store_test.cpp b/be/test/storage/kv_store_test.cpp index 424a218e1c1b7a..0a4d548d52058c 100644 --- a/be/test/storage/kv_store_test.cpp +++ b/be/test/storage/kv_store_test.cpp @@ -155,35 +155,4 @@ TEST_F(KVStoreTest, TestOpDeleteRange) { } } -TEST_F(KVStoreTest, TestOpDeleteRangeFallback) { - // insert 10 keys - for (int i = 0; i < 10; i++) { - std::string key = fmt::format("key_{:016x}", i); - std::string value = fmt::format("val_{:016x}", i); - ASSERT_TRUE(_kv_store->put(META_COLUMN_FAMILY_INDEX, key, value).ok()); - } - for (int i = 0; i < 10; i++) { - std::string key = fmt::format("key_{:016x}", i); - std::string value_get; - ASSERT_TRUE(_kv_store->get(META_COLUMN_FAMILY_INDEX, key, &value_get).ok()); - ASSERT_TRUE(value_get == fmt::format("val_{:016x}", i)); - } - int32_t old_val = config::rocksdb_opt_delete_range_limit; - config::rocksdb_opt_delete_range_limit = 5; - // delete range from 0 ~ 9 - rocksdb::WriteBatch wb; - ASSERT_TRUE(_kv_store - ->OptDeleteRange(META_COLUMN_FAMILY_INDEX, fmt::format("key_{:016x}", 0), - fmt::format("key_{:016x}", 10), &wb) - .ok()); - ASSERT_TRUE(_kv_store->write_batch(&wb).ok()); - // check result - for (int i = 0; i < 10; i++) { - std::string key = fmt::format("key_{:016x}", i); - std::string value_get; - ASSERT_TRUE(_kv_store->get(META_COLUMN_FAMILY_INDEX, key, &value_get).is_not_found()); - } - config::rocksdb_opt_delete_range_limit = old_val; -} - } // namespace starrocks diff --git a/be/test/storage/rowset_column_partial_update_test.cpp b/be/test/storage/rowset_column_partial_update_test.cpp index 2d0cc721cbc83a..c8ad63a18cb732 100644 --- a/be/test/storage/rowset_column_partial_update_test.cpp +++ b/be/test/storage/rowset_column_partial_update_test.cpp @@ -1219,6 +1219,10 @@ TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_source_chunk_limit) { return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2; } })); + // check `calc_upt_memory_usage_per_row` + for (int i = 10; i < 20; i++) { + ASSERT_TRUE(RowsetColumnUpdateState::calc_upt_memory_usage_per_row(rowsets[i].get()) > 0); + } config::vector_chunk_size = old_vector_chunk_size; config::partial_update_memory_limit_per_worker = old_partial_update_memory_limit_per_worker; final_check(tablet, rowsets); diff --git a/bin/start_backend.sh b/bin/start_backend.sh index 26fba7f493ad18..4e72334432aa45 100755 --- a/bin/start_backend.sh +++ b/bin/start_backend.sh @@ -219,7 +219,7 @@ if [ ${RUN_LOG_CONSOLE} -eq 1 ] ; then export GLOG_logtostderr=1 else # redirect stdout/stderr to ${LOG_FILE} - exec &>> ${LOG_FILE} + exec >> ${LOG_FILE} 2>&1 fi echo "start time: $(date), server uptime: $(uptime)" diff --git a/bin/start_fe.sh b/bin/start_fe.sh index aafcfd1e6583b9..a021571e26c1ad 100755 --- a/bin/start_fe.sh +++ b/bin/start_fe.sh @@ -225,7 +225,7 @@ if [ ${RUN_LOG_CONSOLE} -eq 1 ] ; then echo -e "\nsys_log_to_console = true" >> $STARROCKS_HOME/conf/fe.conf else # redirect all subsequent commands' stdout/stderr into $LOG_FILE - exec &>> $LOG_FILE + exec >> $LOG_FILE 2>&1 fi echo "using java version $JAVA_VERSION" diff --git a/docker/dockerfiles/toolchains/toolchains-centos7.Dockerfile b/docker/dockerfiles/toolchains/toolchains-centos7.Dockerfile index c0441a1170b3ad..239e665f941c9f 100644 --- a/docker/dockerfiles/toolchains/toolchains-centos7.Dockerfile +++ b/docker/dockerfiles/toolchains/toolchains-centos7.Dockerfile @@ -3,6 +3,7 @@ # DOCKER_BUILDKIT=1 docker build --rm=true -f docker/dockerfiles/toolchains/toolchains-centos7.Dockerfile -t toolchains-centos7:latest docker/dockerfiles/toolchains/ ARG GCC_INSTALL_HOME=/opt/rh/gcc-toolset-10/root/usr +ARG GCC_10_DOWNLOAD_URL=https://ftp.gnu.org/gnu/gcc/gcc-10.3.0/gcc-10.3.0.tar.gz ARG GCC_DOWNLOAD_URL=https://ftp.gnu.org/gnu/gcc/gcc-14.2.0/gcc-14.2.0.tar.gz ARG CMAKE_INSTALL_HOME=/opt/cmake ARG MAVEN_VERSION=3.6.3 @@ -10,7 +11,7 @@ ARG MAVEN_INSTALL_HOME=/opt/maven # Can't upgrade to a later version, due to incompatible changes between 2.31 and 2.32 ARG BINUTILS_DOWNLOAD_URL=https://ftp.gnu.org/gnu/binutils/binutils-2.30.tar.bz2 # install epel-release directly from the url link -ARG EPEL_RPM_URL=https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm +ARG EPEL_RPM_URL=https://archives.fedoraproject.org/pub/archive/epel/7/x86_64/Packages/e/epel-release-7-14.noarch.rpm FROM centos:centos7 AS fixed-centos7-image # Fix the centos mirrorlist, due to official list is gone after EOL @@ -24,8 +25,18 @@ RUN yum install -y gcc gcc-c++ make automake curl wget gzip gunzip zip bzip2 fil FROM base-builder AS gcc-builder ARG GCC_INSTALL_HOME +ARG GCC_10_DOWNLOAD_URL ARG GCC_DOWNLOAD_URL -RUN mkdir -p /workspace/gcc && \ +# build gcc-10 +RUN mkdir -p /workspace/gcc-10 && \ + cd /workspace/gcc-10 && \ + wget --progress=dot:mega --no-check-certificate $GCC_10_DOWNLOAD_URL -O ../gcc-10.tar.gz && \ + tar -xzf ../gcc-10.tar.gz --strip-components=1 && \ + ./contrib/download_prerequisites && \ + ./configure --disable-multilib --enable-languages=c,c++ --prefix=/workspace/gcc-10/install +RUN cd /workspace/gcc-10 && make -j`nproc` && make install +# build gcc-14 +RUN mkdir -p /workspace/gcc && export CC=/workspace/gcc-10/install/bin/gcc && export CXX=/workspace/gcc-10/install/bin/g++ && \ cd /workspace/gcc && \ wget --progress=dot:mega --no-check-certificate $GCC_DOWNLOAD_URL -O ../gcc.tar.gz && \ tar -xzf ../gcc.tar.gz --strip-components=1 && \ @@ -57,9 +68,8 @@ ARG EPEL_RPM_URL LABEL org.opencontainers.image.source="https://github.com/starrocks/starrocks" -RUN yum-config-manager --add-repo https://cli.github.com/packages/rpm/gh-cli.repo && yum install -y gh - -RUN yum install -y ${EPEL_RPM_URL} && yum install -y wget unzip bzip2 patch bison byacc flex autoconf automake make \ +RUN yum-config-manager --add-repo https://cli.github.com/packages/rpm/gh-cli.repo && yum install -y gh && \ + yum install -y ${EPEL_RPM_URL} && yum install -y wget unzip bzip2 patch bison byacc flex autoconf automake make \ libtool which git ccache binutils-devel python3 file java-11-openjdk java-11-openjdk-devel java-11-openjdk-jmods less psmisc && \ yum clean all && rm -rf /var/cache/yum diff --git a/docs/docusaurus/sidebars.json b/docs/docusaurus/sidebars.json index 3707210215cf25..66d7c62456c755 100644 --- a/docs/docusaurus/sidebars.json +++ b/docs/docusaurus/sidebars.json @@ -84,10 +84,13 @@ "id": "deployment/shared_data/shared_data" }, "items": [ + "deployment/shared_data/hdfs", + "deployment/shared_data/other", "deployment/shared_data/s3", "deployment/shared_data/gcs", "deployment/shared_data/azure", "deployment/shared_data/minio", + "deployment/shared_data/hdfs", "deployment/shared_data/feature-support-shared-data" ] }, @@ -335,7 +338,8 @@ ] }, "using_starrocks/mv_ref", - "using_starrocks/troubleshooting_asynchronous_materialized_views" + "using_starrocks/troubleshooting_asynchronous_materialized_views", + "using_starrocks/feature-support-asynchronous-materialized-views" ] }, "using_starrocks/Colocate_join", diff --git a/docs/en/administration/management/BE_configuration.md b/docs/en/administration/management/BE_configuration.md index b5aab698fc17cb..46fac975afaae8 100644 --- a/docs/en/administration/management/BE_configuration.md +++ b/docs/en/administration/management/BE_configuration.md @@ -4455,7 +4455,7 @@ When this value is set to less than `0`, the system uses the product of its abso