-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Support Cluster Snapshot Backup: support system table for cluster snapshot backup (part2) #54508
[Feature] Support Cluster Snapshot Backup: support system table for cluster snapshot backup (part2) #54508
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include "exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.h" | ||
|
||
#include "exec/schema_scanner/schema_helper.h" | ||
#include "gen_cpp/FrontendService_types.h" | ||
#include "runtime/runtime_state.h" | ||
#include "runtime/string_value.h" | ||
#include "types/logical_type.h" | ||
|
||
namespace starrocks { | ||
|
||
SchemaScanner::ColumnDesc SchemaClusterSnapshotJobsScanner::_s_columns[] = { | ||
{"snapshot_name", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
{"job_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, | ||
{"created_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, | ||
{"finished_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, | ||
{"state", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
{"detail_info", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
{"error_message", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
|
||
}; | ||
|
||
SchemaClusterSnapshotJobsScanner::SchemaClusterSnapshotJobsScanner() | ||
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {} | ||
|
||
SchemaClusterSnapshotJobsScanner::~SchemaClusterSnapshotJobsScanner() = default; | ||
|
||
Status SchemaClusterSnapshotJobsScanner::start(RuntimeState* state) { | ||
RETURN_IF(!_is_init, Status::InternalError("used before initialized.")); | ||
RETURN_IF(!_param->ip || !_param->port, Status::InternalError("IP or port not exists")); | ||
|
||
RETURN_IF_ERROR(SchemaScanner::start(state)); | ||
RETURN_IF_ERROR(SchemaScanner::init_schema_scanner_state(state)); | ||
|
||
TClusterSnapshotJobsRequest request; | ||
return SchemaHelper::get_cluster_snapshot_jobs_info(_ss_state, request, &_result); | ||
} | ||
|
||
Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) { | ||
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map(); | ||
const TClusterSnapshotJobsItem& info = _result.items[_index]; | ||
DatumArray datum_array{ | ||
Slice(info.snapshot_name), info.job_id, | ||
info.created_time, info.finished_time, | ||
Slice(info.state), Slice(info.detail_info), | ||
Slice(info.error_message), | ||
}; | ||
for (const auto& [slot_id, index] : slot_id_map) { | ||
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get(); | ||
column->append_datum(datum_array[slot_id - 1]); | ||
} | ||
_index++; | ||
return {}; | ||
} | ||
|
||
Status SchemaClusterSnapshotJobsScanner::get_next(ChunkPtr* chunk, bool* eos) { | ||
RETURN_IF(!_is_init, Status::InternalError("Used before initialized.")); | ||
RETURN_IF((nullptr == chunk || nullptr == eos), Status::InternalError("input pointer is nullptr.")); | ||
|
||
if (_index >= _result.items.size()) { | ||
*eos = true; | ||
return Status::OK(); | ||
} | ||
*eos = false; | ||
return _fill_chunk(chunk); | ||
} | ||
|
||
} // namespace starrocks | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#pragma once | ||
|
||
#include "exec/schema_scanner.h" | ||
#include "gen_cpp/FrontendService_types.h" | ||
|
||
namespace starrocks { | ||
|
||
class SchemaClusterSnapshotJobsScanner : public SchemaScanner { | ||
public: | ||
SchemaClusterSnapshotJobsScanner(); | ||
~SchemaClusterSnapshotJobsScanner() override; | ||
Status start(RuntimeState* state) override; | ||
Status get_next(ChunkPtr* chunk, bool* eos) override; | ||
|
||
private: | ||
Status _fill_chunk(ChunkPtr* chunk); | ||
|
||
size_t _index = 0; | ||
TClusterSnapshotJobsResponse _result; | ||
static SchemaScanner::ColumnDesc _s_columns[]; | ||
}; | ||
|
||
} // namespace starrocks |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#include "exec/schema_scanner/schema_cluster_snapshots_scanner.h" | ||
|
||
#include "exec/schema_scanner/schema_helper.h" | ||
#include "gen_cpp/FrontendService_types.h" | ||
#include "runtime/runtime_state.h" | ||
#include "runtime/string_value.h" | ||
#include "types/logical_type.h" | ||
|
||
namespace starrocks { | ||
|
||
SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = { | ||
{"snapshot_name", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
{"snapshot_type", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
{"created_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, | ||
{"finished_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, | ||
{"fe_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, | ||
{"starmgr_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true}, | ||
{"properties", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
{"storage_volume", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
{"storage_path", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true}, | ||
|
||
}; | ||
|
||
SchemaClusterSnapshotsScanner::SchemaClusterSnapshotsScanner() | ||
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {} | ||
|
||
SchemaClusterSnapshotsScanner::~SchemaClusterSnapshotsScanner() = default; | ||
|
||
Status SchemaClusterSnapshotsScanner::start(RuntimeState* state) { | ||
RETURN_IF(!_is_init, Status::InternalError("used before initialized.")); | ||
RETURN_IF(!_param->ip || !_param->port, Status::InternalError("IP or port not exists")); | ||
|
||
RETURN_IF_ERROR(SchemaScanner::start(state)); | ||
RETURN_IF_ERROR(SchemaScanner::init_schema_scanner_state(state)); | ||
|
||
TClusterSnapshotsRequest request; | ||
return SchemaHelper::get_cluster_snapshots_info(_ss_state, request, &_result); | ||
} | ||
|
||
Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) { | ||
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map(); | ||
const TClusterSnapshotsItem& info = _result.items[_index]; | ||
DatumArray datum_array{ | ||
Slice(info.snapshot_name), Slice(info.snapshot_type), info.created_time, info.finished_time, | ||
|
||
info.fe_jouranl_id, info.starmgr_jouranl_id, Slice(info.properties), Slice(info.storage_volume), | ||
|
||
Slice(info.storage_path), | ||
}; | ||
for (const auto& [slot_id, index] : slot_id_map) { | ||
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get(); | ||
column->append_datum(datum_array[slot_id - 1]); | ||
} | ||
_index++; | ||
return {}; | ||
} | ||
|
||
Status SchemaClusterSnapshotsScanner::get_next(ChunkPtr* chunk, bool* eos) { | ||
RETURN_IF(!_is_init, Status::InternalError("Used before initialized.")); | ||
RETURN_IF((nullptr == chunk || nullptr == eos), Status::InternalError("input pointer is nullptr.")); | ||
|
||
if (_index >= _result.items.size()) { | ||
*eos = true; | ||
return Status::OK(); | ||
} | ||
*eos = false; | ||
return _fill_chunk(chunk); | ||
} | ||
|
||
} // namespace starrocks | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The most risky bug in this code is: You can modify the code like this: SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
{"snapshot_name", TypeDescriptor::create_varchar_type(MAX_SNAPSHOT_NAME_LENGTH), MAX_SNAPSHOT_NAME_LENGTH, true},
{"snapshot_type", TypeDescriptor::create_varchar_type(MAX_SNAPSHOT_TYPE_LENGTH), MAX_SNAPSHOT_TYPE_LENGTH, true},
{"created_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
{"finished_time", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
{"fe_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
{"starmgr_jouranl_id", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), true},
{"properties", TypeDescriptor::create_varchar_type(MAX_PROPERTIES_LENGTH), MAX_PROPERTIES_LENGTH, true},
{"storage_volume", TypeDescriptor::create_varchar_type(MAX_STORAGE_VOLUME_LENGTH), MAX_STORAGE_VOLUME_LENGTH, true},
{"storage_path", TypeDescriptor::create_varchar_type(MAX_STORAGE_PATH_LENGTH), MAX_STORAGE_PATH_LENGTH, true},
};
// Define appropriate constants for maximum lengths
const int MAX_SNAPSHOT_NAME_LENGTH = 256;
const int MAX_SNAPSHOT_TYPE_LENGTH = 256;
const int MAX_PROPERTIES_LENGTH = 1024;
const int MAX_STORAGE_VOLUME_LENGTH = 256;
const int MAX_STORAGE_PATH_LENGTH = 1024; Explanation: The original code uses |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
#pragma once | ||
|
||
#include "exec/schema_scanner.h" | ||
#include "gen_cpp/FrontendService_types.h" | ||
|
||
namespace starrocks { | ||
|
||
class SchemaClusterSnapshotsScanner : public SchemaScanner { | ||
public: | ||
SchemaClusterSnapshotsScanner(); | ||
~SchemaClusterSnapshotsScanner() override; | ||
Status start(RuntimeState* state) override; | ||
Status get_next(ChunkPtr* chunk, bool* eos) override; | ||
|
||
private: | ||
Status _fill_chunk(ChunkPtr* chunk); | ||
|
||
size_t _index = 0; | ||
TClusterSnapshotsResponse _result; | ||
static SchemaScanner::ColumnDesc _s_columns[]; | ||
}; | ||
|
||
} // namespace starrocks |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
package com.starrocks.catalog.system.information; | ||
|
||
import com.starrocks.catalog.PrimitiveType; | ||
import com.starrocks.catalog.ScalarType; | ||
import com.starrocks.catalog.Table; | ||
import com.starrocks.catalog.system.SystemId; | ||
import com.starrocks.catalog.system.SystemTable; | ||
import com.starrocks.thrift.TSchemaTableType; | ||
|
||
import static com.starrocks.catalog.system.SystemTable.NAME_CHAR_LEN; | ||
import static com.starrocks.catalog.system.SystemTable.builder; | ||
|
||
public class ClusterSnapshotJobsTable { | ||
public static final String NAME = "cluster_snapshot_jobs"; | ||
|
||
public static SystemTable create() { | ||
return new SystemTable(SystemId.CLUSTER_SNAPSHOT_JOBS_ID, | ||
NAME, | ||
Table.TableType.SCHEMA, | ||
builder() | ||
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.column("JOB_ID", ScalarType.createType(PrimitiveType.BIGINT)) | ||
.column("CREATE_TIME", ScalarType.createType(PrimitiveType.BIGINT)) | ||
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT)) | ||
.column("STATE", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.column("DETAIL_INFO", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.column("ERROR_MESSAGE", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.build(), TSchemaTableType.SCH_CLUSTER_SNAPSHOT_JOBS); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
package com.starrocks.catalog.system.information; | ||
|
||
import com.starrocks.catalog.PrimitiveType; | ||
import com.starrocks.catalog.ScalarType; | ||
import com.starrocks.catalog.Table; | ||
import com.starrocks.catalog.system.SystemId; | ||
import com.starrocks.catalog.system.SystemTable; | ||
import com.starrocks.thrift.TSchemaTableType; | ||
|
||
import static com.starrocks.catalog.system.SystemTable.NAME_CHAR_LEN; | ||
import static com.starrocks.catalog.system.SystemTable.builder; | ||
|
||
public class ClusterSnapshotsTable { | ||
public static final String NAME = "cluster_snapshots"; | ||
|
||
public static SystemTable create() { | ||
return new SystemTable(SystemId.CLUSTER_SNAPSHOTS_ID, | ||
NAME, | ||
Table.TableType.SCHEMA, | ||
builder() | ||
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.column("CREATE_TIME", ScalarType.createType(PrimitiveType.BIGINT)) | ||
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT)) | ||
.column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT)) | ||
.column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT)) | ||
.column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.column("STORAGE_VOLUME", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.column("STORAGE_PATH", ScalarType.createVarchar(NAME_CHAR_LEN)) | ||
.build(), TSchemaTableType.SCH_CLUSTER_SNAPSHOTS); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The most risky bug in this code is: You can modify the code like this: public class ClusterSnapshotsTable {
public static final String NAME = "cluster_snapshots";
public static SystemTable create() {
return new SystemTable(SystemId.CLUSTER_SNAPSHOTS_ID,
NAME,
Table.TableType.SCHEMA,
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("SNAPSHOT_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("CREATE_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FE_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("STARMGR_JOURNAL_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("PROPERTIES", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("STORAGE_VOLUME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("STORAGE_PATH", ScalarType.createVarchar(NAME_CHAR_LEN))
.build(), TSchemaTableType.SCH_CLUSTER_SNAPSHOTS // Ensure this value is valid for your use case.
);
}
} Ensure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Memory leak or undefined behavior due to incorrect use of
Slice
withStringValue
.You can modify the code like this:
In the original code, creating a
Slice
frominfo.snapshot_name
,info.state
, etc. directly assumes these values have validdata
andsize
methods or properties, which may not be true or safe, especially if they have not been previously initialized or allocated correctly. Adjusting it as shown explicitly constructs theSlice
with known data pointers and sizes.