Skip to content

Commit

Permalink
feat(duplication): add a new command to shell to list duplications fo…
Browse files Browse the repository at this point in the history
…r one or multiple tables (#2165)

Based on #2164, this PR is to
introduce a new command for Pegasus shell to list the duplications for one or
multiple tables of this cluster according to provided table name pattern.

This command could list table-level, duplication-level and partition-level info for each
duplication. It could also provide summary stats for listed duplications. It could check
the progress, which is useful while we want to know if the duplication-based migrations
have been finished, or how many decrees have not been duplicated. The duplications
that have not been finished could also be listed.
  • Loading branch information
empiredan authored Dec 12, 2024
1 parent 2837b17 commit fb2ed45
Show file tree
Hide file tree
Showing 15 changed files with 577 additions and 45 deletions.
5 changes: 5 additions & 0 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ struct duplication_app_state

// dup id => per-duplication properties
2:map<i32, duplication_entry> duplications;

3:string app_name;

// The number of partitions for this table.
4:i32 partition_count;
}

// This request is sent from client to meta.
Expand Down
1 change: 1 addition & 0 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
}
mtp.add(std::move(tp_count));

// TODO(wangdan): use dsn::utils::output() in output_utils.h instead.
mtp.output(out, json ? tp_output_format::kJsonPretty : tp_output_format::kTabular);

return dsn::ERR_OK;
Expand Down
2 changes: 2 additions & 0 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ void meta_duplication_service::list_duplication_info(const duplication_list_requ

duplication_app_state dup_app;
dup_app.appid = app->app_id;
dup_app.app_name = app_name;
dup_app.partition_count = app->partition_count;

for (const auto &[dup_id, dup] : app->duplications) {
dup_app.duplications.emplace(dup_id, dup->to_partition_level_entry_for_list());
Expand Down
30 changes: 22 additions & 8 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -980,15 +980,15 @@ class aggregate_stats_calcs
const auto param = cmd(param_index++).str(); \
::dsn::utils::split_args(param.c_str(), container, ','); \
if (container.empty()) { \
fmt::print(stderr, \
"invalid command, '{}' should be in the form of 'val1,val2,val3' and " \
"should not be empty\n", \
param); \
SHELL_PRINTLN_ERROR( \
"invalid command, '{}' should be in the form of 'val1,val2,val3' and " \
"should not be empty", \
param); \
return false; \
} \
std::set<std::string> str_set(container.begin(), container.end()); \
if (str_set.size() != container.size()) { \
fmt::print(stderr, "invalid command, '{}' has duplicate values\n", param); \
SHELL_PRINTLN_ERROR("invalid command, '{}' has duplicate values", param); \
return false; \
} \
} while (false)
Expand All @@ -1005,7 +1005,7 @@ class aggregate_stats_calcs
do { \
const auto param = cmd(param_index++).str(); \
if (!::dsn::buf2uint32(param, value)) { \
fmt::print(stderr, "invalid command, '{}' should be an unsigned integer\n", param); \
SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned integer", param); \
return false; \
} \
} while (false)
Expand All @@ -1019,7 +1019,7 @@ class aggregate_stats_calcs
do { \
const auto param = cmd(__VA_ARGS__, (def_val)).str(); \
if (!::dsn::buf2uint32(param, value)) { \
fmt::print(stderr, "invalid command, '{}' should be an unsigned integer\n", param); \
SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned integer", param); \
return false; \
} \
} while (false)
Expand All @@ -1034,13 +1034,27 @@ class aggregate_stats_calcs
for (const auto &str : strs) { \
uint32_t v; \
if (!::dsn::buf2uint32(str, v)) { \
fmt::print(stderr, "invalid command, '{}' should be an unsigned integer\n", str); \
SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned integer", str); \
return false; \
} \
container.insert(v); \
} \
} while (false)

// Parse enum value from the parameters of command line.
#define PARSE_OPT_ENUM(enum_val, invalid_val, ...) \
do { \
const std::string __str(cmd(__VA_ARGS__, "").str()); \
if (!__str.empty()) { \
const auto &__val = enum_from_string(__str.c_str(), invalid_val); \
if (__val == invalid_val) { \
SHELL_PRINTLN_ERROR("invalid enum: '{}'", __str); \
return false; \
} \
enum_val = __val; \
} \
} while (false)

#define RETURN_FALSE_IF_NOT(expr, ...) \
do { \
if (dsn_unlikely(!(expr))) { \
Expand Down
1 change: 1 addition & 0 deletions src/shell/command_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "command_utils.h"

#include <fmt/core.h>
#include <memory>

#include "client/replication_ddl_client.h"
Expand Down
44 changes: 29 additions & 15 deletions src/shell/command_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

#pragma once

#include <fmt/core.h>
#include <stdio.h>
#include <cstdio>
#include <functional>
#include <map>
#include <set>
#include <string>
#include <utility>

#include "shell/argh.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/ports.h"
#include "utils/strings.h"

Expand All @@ -36,35 +38,47 @@ class host_port;

struct shell_context;

inline bool validate_cmd(const argh::parser &cmd,
const std::set<std::string> &params,
const std::set<std::string> &flags)
// Check if positional arguments are empty, and they should also be empty, which means only
// parameters and flags are needed.
inline dsn::error_s empty_pos_args(const argh::parser &cmd)
{
if (cmd.size() > 1) {
fmt::print(stderr, "too many params!\n");
return false;
if (cmd.size() > 0) {
return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "there shouldn't be any positional arguments");
}

return dsn::error_s::ok();
}

// Check if the positional arguments are valid, and the parameters and flags are in the given set.
inline dsn::error_s
validate_cmd(const argh::parser &cmd,
const std::set<std::string> &params,
const std::set<std::string> &flags,
std::function<dsn::error_s(const argh::parser &cmd)> pos_args_checker)
{
const auto &result = pos_args_checker(cmd);
if (!result) {
return result;
}

for (const auto &param : cmd.params()) {
if (params.find(param.first) == params.end()) {
fmt::print(stderr, "unknown param {} = {}\n", param.first, param.second);
return false;
return FMT_ERR(
dsn::ERR_INVALID_PARAMETERS, "unknown param {} = {}", param.first, param.second);
}
}

for (const auto &flag : cmd.flags()) {
if (params.find(flag) != params.end()) {
fmt::print(stderr, "missing value of {}\n", flag);
return false;
return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "missing value of {}", flag);
}

if (flags.find(flag) == flags.end()) {
fmt::print(stderr, "unknown flag {}\n", flag);
return false;
return FMT_ERR(dsn::ERR_INVALID_PARAMETERS, "unknown flag\n", flag);
}
}

return true;
return dsn::error_s::ok();
}

bool validate_ip(shell_context *sc,
Expand Down
2 changes: 2 additions & 0 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ bool add_dup(command_executor *e, shell_context *sc, arguments args);

bool query_dup(command_executor *e, shell_context *sc, arguments args);

bool ls_dups(command_executor *e, shell_context *sc, arguments args);

bool remove_dup(command_executor *e, shell_context *sc, arguments args);

bool start_dup(command_executor *e, shell_context *sc, arguments args);
Expand Down
30 changes: 18 additions & 12 deletions src/shell/commands/detect_hotkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "shell/command_utils.h"
#include "shell/commands.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/string_conv.h"
#include "utils/strings.h"

Expand Down Expand Up @@ -72,19 +73,24 @@ bool detect_hotkey(command_executor *e, shell_context *sc, arguments args)
// detect_hotkey
// <-a|--app_id str><-p|--partition_index num><-t|--hotkey_type read|write>
// <-c|--detect_action start|stop|query><-d|--address str>
const std::set<std::string> params = {"a",
"app_id",
"p",
"partition_index",
"c",
"hotkey_action",
"t",
"hotkey_type",
"d",
"address"};
const std::set<std::string> flags = {};
static const std::set<std::string> params = {"a",
"app_id",
"p",
"partition_index",
"c",
"hotkey_action",
"t",
"hotkey_type",
"d",
"address"};
static const std::set<std::string> flags = {};

argh::parser cmd(args.argc, args.argv, argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
if (!validate_cmd(cmd, params, flags)) {

const auto &check = validate_cmd(cmd, params, flags, empty_pos_args);
if (!check) {
// TODO(wangdan): use SHELL_PRINT* macros instead.
fmt::print(stderr, "{}\n", check.description());
return false;
}

Expand Down
Loading

0 comments on commit fb2ed45

Please sign in to comment.