Skip to content

Commit

Permalink
[Feature] Support Cluster Snapshot Backup: support system table for c…
Browse files Browse the repository at this point in the history
…luster snapshot backup (part2) (#54508)

Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch authored Jan 3, 2025
1 parent 3a35b76 commit f43707f
Show file tree
Hide file tree
Showing 17 changed files with 424 additions and 0 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ set(EXEC_FILES
schema_scanner/sys_fe_locks.cpp
schema_scanner/sys_fe_memory_usage.cpp
schema_scanner/schema_temp_tables_scanner.cpp
schema_scanner/schema_cluster_snapshots_scanner.cpp
schema_scanner/schema_cluster_snapshot_jobs_scanner.cpp
jdbc_scanner.cpp
sorting/compare_column.cpp
sorting/merge_column.cpp
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "exec/schema_scanner/schema_be_threads_scanner.h"
#include "exec/schema_scanner/schema_be_txns_scanner.h"
#include "exec/schema_scanner/schema_charsets_scanner.h"
#include "exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.h"
#include "exec/schema_scanner/schema_cluster_snapshots_scanner.h"
#include "exec/schema_scanner/schema_collations_scanner.h"
#include "exec/schema_scanner/schema_column_stats_usage_scanner.h"
#include "exec/schema_scanner/schema_columns_scanner.h"
Expand Down Expand Up @@ -219,6 +221,10 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return std::make_unique<SchemaColumnStatsUsageScanner>();
case TSchemaTableType::SCH_ANALYZE_STATUS:
return std::make_unique<SchemaAnalyzeStatus>();
case TSchemaTableType::SCH_CLUSTER_SNAPSHOTS:
return std::make_unique<SchemaClusterSnapshotsScanner>();
case TSchemaTableType::SCH_CLUSTER_SNAPSHOT_JOBS:
return std::make_unique<SchemaClusterSnapshotJobsScanner>();
default:
return std::make_unique<SchemaDummyScanner>();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.h"

#include "exec/schema_scanner/schema_helper.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "types/logical_type.h"

namespace starrocks {

SchemaScanner::ColumnDesc SchemaClusterSnapshotJobsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"JOB_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"STATE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"DETAIL_INFO", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"ERROR_MESSAGE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
};

SchemaClusterSnapshotJobsScanner::SchemaClusterSnapshotJobsScanner()
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {}

SchemaClusterSnapshotJobsScanner::~SchemaClusterSnapshotJobsScanner() = default;

Status SchemaClusterSnapshotJobsScanner::start(RuntimeState* state) {
RETURN_IF(!_is_init, Status::InternalError("used before initialized."));
RETURN_IF(!_param->ip || !_param->port, Status::InternalError("IP or port not exists"));

RETURN_IF_ERROR(SchemaScanner::start(state));
RETURN_IF_ERROR(SchemaScanner::init_schema_scanner_state(state));

TClusterSnapshotJobsRequest request;
return SchemaHelper::get_cluster_snapshot_jobs_info(_ss_state, request, &_result);
}

Status SchemaClusterSnapshotJobsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotJobsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), info.job_id, info.created_time,
info.finished_time, Slice(info.state), Slice(info.detail_info),
Slice(info.error_message),
};
for (const auto& [slot_id, index] : slot_id_map) {
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get();
column->append_datum(datum_array[slot_id - 1]);
}
_index++;
return {};
}

Status SchemaClusterSnapshotJobsScanner::get_next(ChunkPtr* chunk, bool* eos) {
RETURN_IF(!_is_init, Status::InternalError("Used before initialized."));
RETURN_IF((nullptr == chunk || nullptr == eos), Status::InternalError("input pointer is nullptr."));

if (_index >= _result.items.size()) {
*eos = true;
return Status::OK();
}
*eos = false;
return _fill_chunk(chunk);
}

} // namespace starrocks
37 changes: 37 additions & 0 deletions be/src/exec/schema_scanner/schema_cluster_snapshot_jobs_scanner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/schema_scanner.h"
#include "gen_cpp/FrontendService_types.h"

namespace starrocks {

class SchemaClusterSnapshotJobsScanner : public SchemaScanner {
public:
SchemaClusterSnapshotJobsScanner();
~SchemaClusterSnapshotJobsScanner() override;
Status start(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;

private:
Status _fill_chunk(ChunkPtr* chunk);

size_t _index = 0;
TClusterSnapshotJobsResponse _result;
static SchemaScanner::ColumnDesc _s_columns[];
};

} // namespace starrocks
83 changes: 83 additions & 0 deletions be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/schema_scanner/schema_cluster_snapshots_scanner.h"

#include "exec/schema_scanner/schema_helper.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "types/logical_type.h"

namespace starrocks {

SchemaScanner::ColumnDesc SchemaClusterSnapshotsScanner::_s_columns[] = {
{"SNAPSHOT_NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"SNAPSHOT_TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"CREATED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FINISHED_TIME", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"FE_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"STARMGR_JOURNAL_ID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(long), true},
{"PROPERTIES", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"STORAGE_VOLUME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
{"STORAGE_PATH", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), true},
};

SchemaClusterSnapshotsScanner::SchemaClusterSnapshotsScanner()
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {}

SchemaClusterSnapshotsScanner::~SchemaClusterSnapshotsScanner() = default;

Status SchemaClusterSnapshotsScanner::start(RuntimeState* state) {
RETURN_IF(!_is_init, Status::InternalError("used before initialized."));
RETURN_IF(!_param->ip || !_param->port, Status::InternalError("IP or port not exists"));

RETURN_IF_ERROR(SchemaScanner::start(state));
RETURN_IF_ERROR(SchemaScanner::init_schema_scanner_state(state));

TClusterSnapshotsRequest request;
return SchemaHelper::get_cluster_snapshots_info(_ss_state, request, &_result);
}

Status SchemaClusterSnapshotsScanner::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TClusterSnapshotsItem& info = _result.items[_index];
DatumArray datum_array{
Slice(info.snapshot_name), Slice(info.snapshot_type), info.created_time, info.finished_time,

info.fe_jouranl_id, info.starmgr_jouranl_id, Slice(info.properties), Slice(info.storage_volume),

Slice(info.storage_path),
};
for (const auto& [slot_id, index] : slot_id_map) {
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get();
column->append_datum(datum_array[slot_id - 1]);
}
_index++;
return {};
}

Status SchemaClusterSnapshotsScanner::get_next(ChunkPtr* chunk, bool* eos) {
RETURN_IF(!_is_init, Status::InternalError("Used before initialized."));
RETURN_IF((nullptr == chunk || nullptr == eos), Status::InternalError("input pointer is nullptr."));

if (_index >= _result.items.size()) {
*eos = true;
return Status::OK();
}
*eos = false;
return _fill_chunk(chunk);
}

} // namespace starrocks
37 changes: 37 additions & 0 deletions be/src/exec/schema_scanner/schema_cluster_snapshots_scanner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/schema_scanner.h"
#include "gen_cpp/FrontendService_types.h"

namespace starrocks {

class SchemaClusterSnapshotsScanner : public SchemaScanner {
public:
SchemaClusterSnapshotsScanner();
~SchemaClusterSnapshotsScanner() override;
Status start(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;

private:
Status _fill_chunk(ChunkPtr* chunk);

size_t _index = 0;
TClusterSnapshotsResponse _result;
static SchemaScanner::ColumnDesc _s_columns[];
};

} // namespace starrocks
13 changes: 13 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,19 @@ Status SchemaHelper::get_analyze_status(const SchemaScannerState& state, const T
});
}

Status SchemaHelper::get_cluster_snapshots_info(const SchemaScannerState& state, const TClusterSnapshotsRequest& req,
TClusterSnapshotsResponse* res) {
return _call_rpc(state,
[&req, &res](FrontendServiceConnection& client) { client->getClusterSnapshotsInfo(*res, req); });
}

Status SchemaHelper::get_cluster_snapshot_jobs_info(const SchemaScannerState& state,
const TClusterSnapshotJobsRequest& req,
TClusterSnapshotJobsResponse* res) {
return _call_rpc(
state, [&req, &res](FrontendServiceConnection& client) { client->getClusterSnapshotJobsInfo(*res, req); });
}

void fill_data_column_with_null(Column* data_column) {
auto* nullable_column = down_cast<NullableColumn*>(data_column);
nullable_column->append_nulls(1);
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ class SchemaHelper {
static Status get_analyze_status(const SchemaScannerState& state, const TAnalyzeStatusReq& var_params,
TAnalyzeStatusRes* var_result);

static Status get_cluster_snapshots_info(const SchemaScannerState& state, const TClusterSnapshotsRequest& req,
TClusterSnapshotsResponse* res);

static Status get_cluster_snapshot_jobs_info(const SchemaScannerState& state,
const TClusterSnapshotJobsRequest& req,
TClusterSnapshotJobsResponse* res);

private:
static Status _call_rpc(const SchemaScannerState& state,
std::function<void(ClientConnection<FrontendServiceClient>&)> callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,8 @@ public class SystemId {
// ==================== Statistics =========================== //
public static final long COLUMN_STATS_USAGE = 150L;
public static final long ANALYZE_STATUS = 151L;

// ================== Cluster Snapshot ======================= //
public static final long CLUSTER_SNAPSHOTS_ID = 160L;
public static final long CLUSTER_SNAPSHOT_JOBS_ID = 161L;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.catalog.system.information;

import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.system.SystemId;
import com.starrocks.catalog.system.SystemTable;
import com.starrocks.thrift.TSchemaTableType;

import static com.starrocks.catalog.system.SystemTable.NAME_CHAR_LEN;
import static com.starrocks.catalog.system.SystemTable.builder;

public class ClusterSnapshotJobsTable {
public static final String NAME = "cluster_snapshot_jobs";

public static SystemTable create() {
return new SystemTable(SystemId.CLUSTER_SNAPSHOT_JOBS_ID,
NAME,
Table.TableType.SCHEMA,
builder()
.column("SNAPSHOT_NAME", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("JOB_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("CREATED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("FINISHED_TIME", ScalarType.createType(PrimitiveType.BIGINT))
.column("STATE", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("DETAIL_INFO", ScalarType.createVarchar(NAME_CHAR_LEN))
.column("ERROR_MESSAGE", ScalarType.createVarchar(NAME_CHAR_LEN))
.build(), TSchemaTableType.SCH_CLUSTER_SNAPSHOT_JOBS);
}
}
Loading

0 comments on commit f43707f

Please sign in to comment.