Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into 1216-security-view
Browse files Browse the repository at this point in the history
  • Loading branch information
HangyuanLiu committed Dec 27, 2024
2 parents 1cc553e + a105bac commit e7e708b
Show file tree
Hide file tree
Showing 611 changed files with 17,684 additions and 4,766 deletions.
26 changes: 11 additions & 15 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,9 @@ if ("${USE_STAROS}" STREQUAL "ON")
find_package(azure-storage-common-cpp CONFIG REQUIRED)
set(azure-storage-blobs-cpp_DIR "${STARLET_INSTALL_DIR}/third_party/share/azure-storage-blobs-cpp" CACHE PATH "azure storage blobs search path")
find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
set(AZURE_SDK_LIB Azure::azure-identity Azure::azure-storage-blobs Azure::azure-core Azure::azure-storage-common)
set(azure-storage-files-datalake-cpp_DIR "${STARLET_INSTALL_DIR}/third_party/share/azure-storage-files-datalake-cpp" CACHE PATH "azure storage file datalake search path")
find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED)
set(AZURE_SDK_LIB Azure::azure-identity Azure::azure-storage-blobs Azure::azure-storage-files-datalake Azure::azure-core Azure::azure-storage-common)

set(starlet_DIR "${STARLET_INSTALL_DIR}/starlet_install/lib/cmake" CACHE PATH "starlet search path")
find_package(starlet CONFIG REQUIRED)
Expand Down Expand Up @@ -859,7 +861,7 @@ set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES}
set_target_properties(aws-cpp-sdk-core PROPERTIES INTERFACE_LINK_LIBRARIES AWS::aws-crt-cpp)

if (STARROCKS_JIT_ENABLE)
set(STARROCKS_DEPENDENCIES
set(STARROCKS_DEPENDENCIES
${STARROCKS_DEPENDENCIES}
${WL_START_GROUP}
${LLVM_LIBRARIES}
Expand Down Expand Up @@ -969,7 +971,6 @@ endif()
# Add all external dependencies. They should come after the starrocks libs.

set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS}
${STARROCKS_LINK_LIBS}
${STARROCKS_DEPENDENCIES}
hdfs
jvm
Expand All @@ -979,33 +980,28 @@ set(BUILD_FOR_SANITIZE "OFF")
# Add sanitize static link flags or jemalloc
if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR "${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
message("use jemalloc")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS jemalloc)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} jemalloc)
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "ASAN")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libsan)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libsan)
else()
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libasan)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libasan)
endif()
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "LSAN")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-liblsan)
set(BUILD_FOR_SANITIZE "ON")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-liblsan)
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
message("use jemalloc")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libubsan jemalloc)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libubsan jemalloc)
set(BUILD_FOR_SANITIZE "ON")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
set(STARROCKS_MEMORY_DEPENDENCIES_LIBS -static-libtsan)
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} -static-libtsan)
set(BUILD_FOR_SANITIZE "ON")
else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
endif()

if (NOT BUILD_FORMAT_LIB)
# skip the STARROCKS_MEMORY_DEPENDENCIES_LIBS only when BUILD_FORMAT_LIB=ON
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} ${STARROCKS_MEMORY_DEPENDENCIES_LIBS})
endif()

if (NOT ("${MAKE_TEST}" STREQUAL "ON" AND "${BUILD_FOR_SANITIZE}" STREQUAL "ON"))
# In other words, turn to dynamic link when MAKE_TEST and BUILD_TYPE == *SAN
# otherwise do static link gcc's lib
Expand Down
4 changes: 4 additions & 0 deletions be/src/column/field.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class Field {
_name(rhs._name),
_type(rhs._type),
_sub_fields(rhs._sub_fields ? new std::vector<Field>(*rhs._sub_fields) : nullptr),
_length(rhs._length),
_short_key_length(rhs._short_key_length),
_flags(rhs._flags),
_uid(rhs._uid) {}
Expand All @@ -95,6 +96,7 @@ class Field {
_name(std::move(rhs._name)),
_type(std::move(rhs._type)),
_sub_fields(rhs._sub_fields),
_length(rhs._length),
_short_key_length(rhs._short_key_length),
_flags(rhs._flags),
_uid(rhs._uid) {
Expand All @@ -108,6 +110,7 @@ class Field {
_name = rhs._name;
_type = rhs._type;
_agg_method = rhs._agg_method;
_length = rhs._length;
_agg_state_desc = rhs._agg_state_desc;
_short_key_length = rhs._short_key_length;
_flags = rhs._flags;
Expand All @@ -123,6 +126,7 @@ class Field {
_name = std::move(rhs._name);
_type = std::move(rhs._type);
_agg_method = rhs._agg_method;
_length = rhs._length;
_agg_state_desc = rhs._agg_state_desc;
_short_key_length = rhs._short_key_length;
_flags = rhs._flags;
Expand Down
20 changes: 16 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ CONF_String(mem_limit, "90%");

// Enable the jemalloc tracker, which is responsible for reserving memory
CONF_Bool(enable_jemalloc_memory_tracker, "true");
// Alpha number of jemalloc memory fragmentation ratio, should in range (0, 1)
CONF_mDouble(jemalloc_fragmentation_ratio, "0.3");

// The port heartbeat service used.
CONF_Int32(heartbeat_service_port, "9050");
Expand Down Expand Up @@ -874,7 +872,7 @@ CONF_Int64(object_storage_connect_timeout_ms, "-1");
// Note that for Curl this config is converted to seconds by rounding down to the nearest whole second except when the
// value is greater than 0 and less than 1000.
// When it's 0, low speed limit check will be disabled.
CONF_Int64(object_storage_request_timeout_ms, "-1");
CONF_mInt64(object_storage_request_timeout_ms, "-1");
// Request timeout for object storage specialized for rename_file operation.
// if this parameter is 0, use object_storage_request_timeout_ms instead.
CONF_Int64(object_storage_rename_file_request_timeout_ms, "30000");
Expand Down Expand Up @@ -1033,6 +1031,9 @@ CONF_mInt32(starlet_fs_read_prefetch_threadpool_size, "128");
CONF_mInt32(starlet_fslib_s3client_nonread_max_retries, "5");
CONF_mInt32(starlet_fslib_s3client_nonread_retry_scale_factor, "200");
CONF_mInt32(starlet_fslib_s3client_connect_timeout_ms, "1000");
// make starlet_fslib_s3client_request_timeout_ms as an alias of the object_storage_request_timeout_ms
// NOTE: need to handle the negative value properly
CONF_Alias(object_storage_request_timeout_ms, starlet_fslib_s3client_request_timeout_ms);
CONF_mInt32(starlet_delete_files_max_key_in_batch, "1000");
#endif

Expand Down Expand Up @@ -1064,6 +1065,7 @@ CONF_mInt32(lake_pk_index_sst_max_compaction_versions, "100");
// When the ratio of cumulative level to base level is greater than this config, use base merge.
CONF_mDouble(lake_pk_index_cumulative_base_compaction_ratio, "0.1");
CONF_Int32(lake_pk_index_block_cache_limit_percent, "10");
CONF_mBool(lake_clear_corrupted_cache, "true");

CONF_mBool(dependency_librdkafka_debug_enable, "false");

Expand Down Expand Up @@ -1445,7 +1447,7 @@ CONF_Bool(report_python_worker_error, "true");
CONF_Bool(python_worker_reuse, "true");
CONF_Int32(python_worker_expire_time_sec, "300");
CONF_mBool(enable_pk_strict_memcheck, "true");
CONF_mBool(skip_lake_pk_preload, "false");
CONF_mBool(skip_pk_preload, "false");
// Reduce core file size by not dumping jemalloc retain pages
CONF_mBool(enable_core_file_size_optimization, "true");
// Current supported modules:
Expand Down Expand Up @@ -1506,6 +1508,12 @@ CONF_mInt32(max_committed_without_schema_rowset, "1000");

CONF_mInt32(apply_version_slow_log_sec, "30");

// The time that stream load pipe waits for the input. The pipe will block the pipeline scan executor
// util the input is available or the timeout is reached. Don't set this value too large to avoid
// blocking the pipeline scan executor for a long time.
CONF_mInt32(merge_commit_stream_load_pipe_block_wait_us, "500");
// The maximum number of bytes that the merge commit stream load pipe can buffer.
CONF_mInt64(merge_commit_stream_load_pipe_max_buffered_bytes, "1073741824");
CONF_Int32(batch_write_thread_pool_num_min, "0");
CONF_Int32(batch_write_thread_pool_num_max, "512");
CONF_Int32(batch_write_thread_pool_queue_size, "4096");
Expand All @@ -1516,6 +1524,10 @@ CONF_mInt32(batch_write_rpc_reqeust_timeout_ms, "10000");
CONF_mInt32(batch_write_poll_load_status_interval_ms, "200");
CONF_mBool(batch_write_trace_log_enable, "false");

CONF_mBool(enable_load_spill, "false");
// Max chunk bytes which allow to spill per flush. Default is 10MB.
CONF_mInt64(load_spill_max_chunk_bytes, "10485760");

// ignore union type tag in avro kafka routine load
CONF_mBool(avro_ignore_union_type_tag, "false");

Expand Down
30 changes: 0 additions & 30 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ static void retrieve_jemalloc_stats(JemallocStats* stats) {
// Tracker the memory usage of jemalloc
void jemalloc_tracker_daemon(void* arg_this) {
auto* daemon = static_cast<Daemon*>(arg_this);
double smoothed_fragmentation = 0;
while (!daemon->stopped()) {
JemallocStats stats;
retrieve_jemalloc_stats(&stats);
Expand All @@ -223,35 +222,6 @@ void jemalloc_tracker_daemon(void* arg_this) {
tracker->consume(delta);
}

// Fragmentation and dirty memory:
// Jemalloc retains some dirty memory and gradually returns it to the OS, which cannot be reused by the application.
// Failing to account for this memory in the MemoryTracker may lead to memory allocation failures or even process
// termination by the OS; however, retaining excessive memory in the tracker is also wasteful.
// To address this, we employ a smoothing formula to track fragmentation and dirty memory, which also mitigates
// the impact of sudden memory releases, such as those occurring when a large query is executed:
// S_t = \exp\left(\alpha \cdot \log(1 + x_t) + (1 - \alpha) \cdot \log(1 + S_{t-1})\right) - 1
if (GlobalEnv::GetInstance()->jemalloc_fragmentation_traker()) {
if (stats.resident > 0 && stats.allocated > 0 && stats.metadata > 0) {
double fragmentation = stats.resident - stats.allocated - stats.metadata;
if (fragmentation < 0) fragmentation = 0;

// log transformation
double alpha = std::clamp(config::jemalloc_fragmentation_ratio, 0.1, 0.9);
fragmentation = std::log1p(fragmentation);
// smoothing
fragmentation = alpha * fragmentation + smoothed_fragmentation * (1 - alpha);
// restore the log value
smoothed_fragmentation = fragmentation;
fragmentation = std::expm1(fragmentation);

if (fragmentation >= 0) {
auto tracker = GlobalEnv::GetInstance()->jemalloc_fragmentation_traker();
int64_t delta = fragmentation - tracker->consumption();
tracker->consume(delta);
}
}
}

nap_sleep(1, [daemon] { return daemon->stopped(); });
}
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class STATUS_ATTRIBUTE Status {

bool is_end_of_file() const { return code() == TStatusCode::END_OF_FILE; }

bool is_internal_error() const { return code() == TStatusCode::INTERNAL_ERROR; }

bool is_ok_or_eof() const { return ok() || is_end_of_file(); }

bool is_not_found() const { return code() == TStatusCode::NOT_FOUND; }
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/statusor.h
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,11 @@ inline std::ostream& operator<<(std::ostream& os, const StatusOr<T>& st) {
RETURN_IF_ERROR(varname); \
lhs = std::move(varname).value();

#define ASSIGN_OR_ASSERT_FAIL_IMPL(varname, lhs, rhs) \
auto&& varname = (rhs); \
ASSERT_OK(varname); \
lhs = std::move(varname).value();

// ASSIGN_OR_RETURN is modelled after Apache Arrow's ARROW_ASSIGN_OR_RAISE macro.
//
// Execute an expression that returns a StatusOr, extracting its value
Expand All @@ -732,6 +737,7 @@ inline std::ostream& operator<<(std::ostream& os, const StatusOr<T>& st) {
// WARNING: ARROW_ASSIGN_OR_RETURN `std::move`s its right operand. If you have
// an lvalue StatusOr which you *don't* want to move out of cast appropriately.
#define ASSIGN_OR_RETURN(lhs, rhs) ASSIGN_OR_RETURN_IMPL(VARNAME_LINENUM(value_or_err), lhs, rhs)
#define ASSIGN_OR_ASSERT_FAIL(lhs, rhs) ASSIGN_OR_ASSERT_FAIL_IMPL(VARNAME_LINENUM(value_or_err), lhs, rhs)

#define ASSIGN_OR_SET_STATUS_AND_RETURN_IF_ERROR_IMPL(err_status, lhs, rhs) \
auto&& varname = (rhs); \
Expand Down
Loading

0 comments on commit e7e708b

Please sign in to comment.