Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support Cluster Snapshot Backup: support system table for cluster snapshot backup (part2) #54508

Merged
merged 4 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ set(EXEC_FILES
schema_scanner/sys_fe_locks.cpp
schema_scanner/sys_fe_memory_usage.cpp
schema_scanner/schema_temp_tables_scanner.cpp
schema_scanner/schema_cluster_snapshots_scanner.cpp
schema_scanner/schema_cluster_snapshot_jobs_scanner.cpp
jdbc_scanner.cpp
sorting/compare_column.cpp
sorting/merge_column.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.h"

#include "exec/schema_scanner/schema_helper.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "types/logical_type.h"

namespace starrocks {

SchemaScanner::ColumnDesc SchemaClusterSnapshotJobsScanner::_s_columns[] = {
{"snapshot_name", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"job_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"created_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"finished_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"state", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"detail_info", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"error_message", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},

};

SchemaClusterSnapshotJobsScanner::SchemaClusterSnapshotJobsScanner()
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {}

SchemaClusterSnapshotJobsScanner::~SchemaClusterSnapshotJobsScanner() = default;

Status SchemaClusterSnapshotJobsScanner::start(RuntimeState* state) {
RETURN_IF(!_is_init, Status::InternalError("used before initialized."));
RETURN_IF(!_param->ip || !_param->port, Status::InternalError("IP or port not exists"));

RETURN_IF_ERROR(SchemaScanner::start(state));
RETURN_IF_ERROR(SchemaScanner::init_schema_scanner_state(state));

TClusterSnapshotJobsRequest request;
return SchemaHelper::get_cluster_snapshot_jobs_info(_ss_state, request, &_result);
}

Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotJobsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), info.job_id,
info.created_time, info.finished_time,
Slice(info.state), Slice(info.detail_info),
Slice(info.error_message),
};
for (const auto& [slot_id, index] : slot_id_map) {
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get();
column->append_datum(datum_array[slot_id - 1]);
}
_index++;
return {};
}

Status SchemaClusterSnapshotJobsScanner::get_next(ChunkPtr* chunk, bool* eos) {
RETURN_IF(!_is_init, Status::InternalError("Used before initialized."));
RETURN_IF((nullptr == chunk || nullptr == eos), Status::InternalError("input pointer is nullptr."));

if (_index >= _result.items.size()) {
*eos = true;
return Status::OK();
}
*eos = false;
return _fill_chunk(chunk);
}

} // namespace starrocks
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Memory leak or undefined behavior due to incorrect use of Slice with StringValue.

You can modify the code like this:

Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) {
    auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
    const TClusterSnapshotJobsItem& info = _result.items[_index];
    DatumArray datum_array{
            Slice(info.snapshot_name.data(), info.snapshot_name.size()), 
            info.job_id, 
            info.created_time, 
            info.finished_time,
            Slice(info.state.data(), info.state.size()), 
            Slice(info.detail_info.data(), info.detail_info.size()), 
            Slice(info.error_message.data(), info.error_message.size()),
    };
    for (const auto& [slot_id, index] : slot_id_map) {
        Column* column = (*chunk)->get_column_by_slot_id(slot_id).get();
        column->append_datum(datum_array[slot_id - 1]);
    }
    _index++;
    return {};
}

In the original code, creating a Slice from info.snapshot_name, info.state, etc. directly assumes these values have valid data and size methods or properties, which may not be true or safe, especially if they have not been previously initialized or allocated correctly. Adjusting it as shown explicitly constructs the Slice with known data pointers and sizes.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/schema_scanner.h"
#include "gen_cpp/FrontendService_types.h"

namespace starrocks {

class SchemaClusterSnapshotJobsScanner : public SchemaScanner {
public:
SchemaClusterSnapshotJobsScanner();
~SchemaClusterSnapshotJobsScanner() override;
Status start(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;

private:
Status _fill_chunk(ChunkPtr* chunk);

size_t _index = 0;
TClusterSnapshotJobsResponse _result;
static SchemaScanner::ColumnDesc _s_columns[];
};

} // namespace starrocks
84 changes: 84 additions & 0 deletions be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/schema_scanner/schema_cluster_snapshots_scanner.h"

#include "exec/schema_scanner/schema_helper.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "types/logical_type.h"

namespace starrocks {

SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
{"snapshot_name", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"snapshot_type", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"created_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"finished_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"fe_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"starmgr_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"properties", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"storage_volume", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"storage_path", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},

};

SchemaClusterSnapshotsScanner::SchemaClusterSnapshotsScanner()
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {}

SchemaClusterSnapshotsScanner::~SchemaClusterSnapshotsScanner() = default;

Status SchemaClusterSnapshotsScanner::start(RuntimeState* state) {
RETURN_IF(!_is_init, Status::InternalError("used before initialized."));
RETURN_IF(!_param->ip || !_param->port, Status::InternalError("IP or port not exists"));

RETURN_IF_ERROR(SchemaScanner::start(state));
RETURN_IF_ERROR(SchemaScanner::init_schema_scanner_state(state));

TClusterSnapshotsRequest request;
return SchemaHelper::get_cluster_snapshots_info(_ss_state, request, &_result);
}

Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), Slice(info.snapshot_type), info.created_time, info.finished_time,

info.fe_jouranl_id, info.starmgr_jouranl_id, Slice(info.properties), Slice(info.storage_volume),

Slice(info.storage_path),
};
for (const auto& [slot_id, index] : slot_id_map) {
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get();
column->append_datum(datum_array[slot_id - 1]);
}
_index++;
return {};
}

Status SchemaClusterSnapshotsScanner::get_next(ChunkPtr* chunk, bool* eos) {
RETURN_IF(!_is_init, Status::InternalError("Used before initialized."));
RETURN_IF((nullptr == chunk || nullptr == eos), Status::InternalError("input pointer is nullptr."));

if (_index >= _result.items.size()) {
*eos = true;
return Status::OK();
}
*eos = false;
return _fill_chunk(chunk);
}

} // namespace starrocks
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
A potential buffer overflow or incorrect memory size calculation for varchar-based fields using sizeof(StringValue) instead of a proper max length.

You can modify the code like this:

SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
        {"snapshot_name", TypeDescriptor::create_varchar_type(MAX_SNAPSHOT_NAME_LENGTH), MAX_SNAPSHOT_NAME_LENGTH, true},
        {"snapshot_type", TypeDescriptor::create_varchar_type(MAX_SNAPSHOT_TYPE_LENGTH), MAX_SNAPSHOT_TYPE_LENGTH, true},
        {"created_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
        {"finished_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
        {"fe_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
        {"starmgr_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
        {"properties", TypeDescriptor::create_varchar_type(MAX_PROPERTIES_LENGTH), MAX_PROPERTIES_LENGTH, true},
        {"storage_volume", TypeDescriptor::create_varchar_type(MAX_STORAGE_VOLUME_LENGTH), MAX_STORAGE_VOLUME_LENGTH, true},
        {"storage_path", TypeDescriptor::create_varchar_type(MAX_STORAGE_PATH_LENGTH), MAX_STORAGE_PATH_LENGTH, true},
};

// Define appropriate constants for maximum lengths
const int MAX_SNAPSHOT_NAME_LENGTH = 256;
const int MAX_SNAPSHOT_TYPE_LENGTH = 256;
const int MAX_PROPERTIES_LENGTH = 1024;
const int MAX_STORAGE_VOLUME_LENGTH = 256;
const int MAX_STORAGE_PATH_LENGTH = 1024;

Explanation: The original code uses sizeof(StringValue) to define the varchar type size, which is wrong because sizeof(StringValue) returns the size of the pointer structure but does not allocate sufficient space for the actual content. This can lead to potential overflows or incorrect behavior. Proper constants should be defined and used to represent the maximum allowed sizes for each varchar field.

37 changes: 37 additions & 0 deletions be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/schema_scanner.h"
#include "gen_cpp/FrontendService_types.h"

namespace starrocks {

class SchemaClusterSnapshotsScanner : public SchemaScanner {
public:
SchemaClusterSnapshotsScanner();
~SchemaClusterSnapshotsScanner() override;
Status start(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;

private:
Status _fill_chunk(ChunkPtr* chunk);

size_t _index = 0;
TClusterSnapshotsResponse _result;
static SchemaScanner::ColumnDesc _s_columns[];
};

} // namespace starrocks
12 changes: 12 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ Status SchemaHelper::get_analyze_status(const SchemaScannerState& state, const T
});
}

Status SchemaHelper::get_cluster_snapshots_info(const SchemaScannerState& state, const TClusterSnapshotsRequest& req,
TClusterSnapshotsResponse* res) {
return _call_rpc(state,
[&req, &res](FrontendServiceConnection& client) { client->getClusterSnapshotsInfo(*res, req); });
}

Status SchemaHelper::get_cluster_snapshot_jobs_info(const SchemaScannerState& state, const TClusterSnapshotJobsRequest& req,
TClusterSnapshotJobsResponse* res) {
return _call_rpc(
state, [&req, &res](FrontendServiceConnection& client) { client->getClusterSnapshotJobsInfo(*res, req); });
}

void fill_data_column_with_null(Column* data_column) {
auto* nullable_column = down_cast<NullableColumn*>(data_column);
nullable_column->append_nulls(1);
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ class SchemaHelper {
static Status get_analyze_status(const SchemaScannerState& state, const TAnalyzeStatusReq& var_params,
TAnalyzeStatusRes* var_result);

static Status get_cluster_snapshots_info(const SchemaScannerState& state, const TClusterSnapshotsRequest& req,
TClusterSnapshotsResponse* res);

static Status get_cluster_snapshot_jobs_info(const SchemaScannerState& state,
const TClusterSnapshotJobsRequest& req,
TClusterSnapshotJobsResponse* res);

private:
static Status _call_rpc(const SchemaScannerState& state,
std::function<void(ClientConnection<FrontendServiceClient>&)> callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,8 @@ public class SystemId {
// ==================== Statistics =========================== //
public static final long COLUMN_STATS_USAGE = 150L;
public static final long ANALYZE_STATUS = 151L;

// ================== Cluster Snapshot ======================= //
public static final long CLUSTER_SNAPSHOTS_ID = 160L;
public static final long CLUSTER_SNAPSHOT_JOBS_ID = 161L;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.catalog.system.information;

import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.system.SystemId;
import com.starrocks.catalog.system.SystemTable;
import com.starrocks.thrift.TSchemaTableType;

import static com.starrocks.catalog.system.SystemTable.NAME_CHAR_LEN;
import static com.starrocks.catalog.system.SystemTable.builder;

public class ClusterSnapshotJobsTable {
public static final String NAME = "cluster_snapshot_jobs";

public static SystemTable create() {
return new SystemTable(SystemId.CLUSTER_SNAPSHOT_JOBS_ID,
NAME,
Table.TableType.SCHEMA,
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("JOB_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATE_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("STATE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("DETAIL_INFO", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("ERROR_MESSAGE", ScalarType.createVarchar(NAME_CHAR_LEN))
.build(), TSchemaTableType.SCH_CLUSTER_SNAPSHOT_JOBS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.catalog.system.information;

import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.system.SystemId;
import com.starrocks.catalog.system.SystemTable;
import com.starrocks.thrift.TSchemaTableType;

import static com.starrocks.catalog.system.SystemTable.NAME_CHAR_LEN;
import static com.starrocks.catalog.system.SystemTable.builder;

public class ClusterSnapshotsTable {
public static final String NAME = "cluster_snapshots";

public static SystemTable create() {
return new SystemTable(SystemId.CLUSTER_SNAPSHOTS_ID,
NAME,
Table.TableType.SCHEMA,
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("CREATE_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("STORAGE_VOLUME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("STORAGE_PATH", ScalarType.createVarchar(NAME_CHAR_LEN))
.build(), TSchemaTableType.SCH_CLUSTER_SNAPSHOTS);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
TSchemaTableType.SCH_CLUSTER_SNAPSHOTS should match the context and might be incorrect if not properly defined elsewhere.

You can modify the code like this:

public class ClusterSnapshotsTable {
    public static final String NAME = "cluster_snapshots";

    public static SystemTable create() {
        return new SystemTable(SystemId.CLUSTER_SNAPSHOTS_ID,
                NAME,
                Table.TableType.SCHEMA,
                builder()
                        .column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
                        .column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN))
                        .column("CREATE_TIME", ScalarType.createType(PrimitiveType.BIGINT))
                        .column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
                        .column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
                        .column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
                        .column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN))
                        .column("STORAGE_VOLUME", ScalarType.createVarchar(NAME_CHAR_LEN))
                        .column("STORAGE_PATH", ScalarType.createVarchar(NAME_CHAR_LEN))
                        .build(), TSchemaTableType.SCH_CLUSTER_SNAPSHOTS // Ensure this value is valid for your use case.
        );
    }
}

Ensure TSchemaTableType.SCH_CLUSTER_SNAPSHOTS is accurately defined in the context of your application, as it may represent a generic or outdated placeholder that doesn't match your schema expectations.

Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public InfoSchemaDb(String catalogName) {
super.registerTableUnlocked(TemporaryTablesTable.create());
super.registerTableUnlocked(ColumnStatsUsageSystemTable.create());
super.registerTableUnlocked(AnalyzeStatusSystemTable.create());
super.registerTableUnlocked(ClusterSnapshotsTable.create());
super.registerTableUnlocked(ClusterSnapshotJobsTable.create());
}
}

Expand Down
Loading
Loading